1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "errors" 22 "flag" 23 "fmt" 24 "log" 25 "math" 26 "os" 27 "reflect" 28 "strings" 29 "sync" 30 "testing" 31 "time" 32 33 "cloud.google.com/go/civil" 34 "cloud.google.com/go/internal/testutil" 35 "cloud.google.com/go/internal/uid" 36 database "cloud.google.com/go/spanner/admin/database/apiv1" 37 instance "cloud.google.com/go/spanner/admin/instance/apiv1" 38 "google.golang.org/api/iterator" 39 "google.golang.org/api/option" 40 adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" 41 instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1" 42 "google.golang.org/grpc" 43 "google.golang.org/grpc/codes" 44 "google.golang.org/grpc/status" 45) 46 47var ( 48 // testProjectID specifies the project used for testing. It can be changed 49 // by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID. 50 testProjectID = testutil.ProjID() 51 52 dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) 53 instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true}) 54 testInstanceID = instanceNameSpace.New() 55 56 testTable = "TestTable" 57 testTableIndex = "TestTableByValue" 58 testTableColumns = []string{"Key", "StringValue"} 59 60 databaseAdmin *database.DatabaseAdminClient 61 instanceAdmin *instance.InstanceAdminClient 62 63 singerDBStatements = []string{ 64 `CREATE TABLE Singers ( 65 SingerId INT64 NOT NULL, 66 FirstName STRING(1024), 67 LastName STRING(1024), 68 SingerInfo BYTES(MAX) 69 ) PRIMARY KEY (SingerId)`, 70 `CREATE INDEX SingerByName ON Singers(FirstName, LastName)`, 71 `CREATE TABLE Accounts ( 72 AccountId INT64 NOT NULL, 73 Nickname STRING(100), 74 Balance INT64 NOT NULL, 75 ) PRIMARY KEY (AccountId)`, 76 `CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`, 77 `CREATE TABLE Types ( 78 RowID INT64 NOT NULL, 79 String STRING(MAX), 80 StringArray ARRAY<STRING(MAX)>, 81 Bytes BYTES(MAX), 82 BytesArray ARRAY<BYTES(MAX)>, 83 Int64a INT64, 84 Int64Array ARRAY<INT64>, 85 Bool BOOL, 86 BoolArray ARRAY<BOOL>, 87 Float64 FLOAT64, 88 Float64Array ARRAY<FLOAT64>, 89 Date DATE, 90 DateArray ARRAY<DATE>, 91 Timestamp TIMESTAMP, 92 TimestampArray ARRAY<TIMESTAMP>, 93 ) PRIMARY KEY (RowID)`, 94 } 95 96 readDBStatements = []string{ 97 `CREATE TABLE TestTable ( 98 Key STRING(MAX) NOT NULL, 99 StringValue STRING(MAX) 100 ) PRIMARY KEY (Key)`, 101 `CREATE INDEX TestTableByValue ON TestTable(StringValue)`, 102 `CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`, 103 } 104 105 simpleDBStatements = []string{ 106 `CREATE TABLE test ( 107 a STRING(1024), 108 b STRING(1024), 109 ) PRIMARY KEY (a)`, 110 } 111 simpleDBTableColumns = []string{"a", "b"} 112 113 ctsDBStatements = []string{ 114 `CREATE TABLE TestTable ( 115 Key STRING(MAX) NOT NULL, 116 Ts TIMESTAMP OPTIONS (allow_commit_timestamp = true), 117 ) PRIMARY KEY (Key)`, 118 } 119) 120 121const ( 122 str1 = "alice" 123 str2 = "a@example.com" 124) 125 126func TestMain(m *testing.M) { 127 cleanup := initIntegrationTests() 128 res := m.Run() 129 cleanup() 130 os.Exit(res) 131} 132 133var grpcHeaderChecker = testutil.DefaultHeadersEnforcer() 134 135func initIntegrationTests() (cleanup func()) { 136 ctx := context.Background() 137 flag.Parse() // Needed for testing.Short(). 138 noop := func() {} 139 140 if testing.Short() { 141 log.Println("Integration tests skipped in -short mode.") 142 return noop 143 } 144 145 if testProjectID == "" { 146 log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing") 147 return noop 148 } 149 150 ts := testutil.TokenSource(ctx, AdminScope, Scope) 151 if ts == nil { 152 log.Printf("Integration test skipped: cannot get service account credential from environment variable %v", "GCLOUD_TESTS_GOLANG_KEY") 153 return noop 154 } 155 var err error 156 157 opts := append(grpcHeaderChecker.CallOptions(), option.WithTokenSource(ts), option.WithEndpoint(endpoint)) 158 159 // Run integration tests against the given emulator. Currently, the database and 160 // instance admin clients are auto-generated, which do not support to configure 161 // SPANNER_EMULATOR_HOST. 162 emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST") 163 if emulatorAddr != "" { 164 opts = append( 165 grpcHeaderChecker.CallOptions(), 166 option.WithEndpoint(emulatorAddr), 167 option.WithGRPCDialOption(grpc.WithInsecure()), 168 option.WithoutAuthentication(), 169 ) 170 } 171 172 // Create InstanceAdmin and DatabaseAdmin clients. 173 instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...) 174 if err != nil { 175 log.Fatalf("cannot create instance databaseAdmin client: %v", err) 176 } 177 databaseAdmin, err = database.NewDatabaseAdminClient(ctx, opts...) 178 if err != nil { 179 log.Fatalf("cannot create databaseAdmin client: %v", err) 180 } 181 // Get the list of supported instance configs for the project that is used 182 // for the integration tests. The supported instance configs can differ per 183 // project. The integration tests will use the first instance config that 184 // is returned by Cloud Spanner. This will normally be the regional config 185 // that is physically the closest to where the request is coming from. 186 configIterator := instanceAdmin.ListInstanceConfigs(ctx, &instancepb.ListInstanceConfigsRequest{ 187 Parent: fmt.Sprintf("projects/%s", testProjectID), 188 }) 189 config, err := configIterator.Next() 190 if err != nil { 191 log.Fatalf("Cannot get any instance configurations.\nPlease make sure the Cloud Spanner API is enabled for the test project.\nGet error: %v", err) 192 } 193 // Create a test instance to use for this test run. 194 op, err := instanceAdmin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{ 195 Parent: fmt.Sprintf("projects/%s", testProjectID), 196 InstanceId: testInstanceID, 197 Instance: &instancepb.Instance{ 198 Config: config.Name, 199 DisplayName: testInstanceID, 200 NodeCount: 1, 201 }, 202 }) 203 if err != nil { 204 log.Fatalf("could not create instance with id %s: %v", fmt.Sprintf("projects/%s/instances/%s", testProjectID, testInstanceID), err) 205 } 206 // Wait for the instance creation to finish. 207 i, err := op.Wait(ctx) 208 if err != nil { 209 log.Fatalf("waiting for instance creation to finish failed: %v", err) 210 } 211 if i.State != instancepb.Instance_READY { 212 log.Printf("instance state is not READY, it might be that the test instance will cause problems during tests. Got state %v\n", i.State) 213 } 214 215 return func() { 216 // Delete this test instance. 217 instanceName := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID) 218 if err = instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{ 219 Name: instanceName, 220 }); err != nil { 221 log.Printf("failed to drop instance %s (error %v), might need a manual removal", 222 instanceName, err) 223 } 224 // Delete other test instances that may be lingering around. 225 cleanupInstances() 226 databaseAdmin.Close() 227 instanceAdmin.Close() 228 } 229} 230 231func TestIntegration_InitSessionPool(t *testing.T) { 232 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 233 defer cancel() 234 // Set up testing environment. 235 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 236 defer cleanup() 237 sp := client.idleSessions 238 sp.mu.Lock() 239 want := sp.MinOpened 240 sp.mu.Unlock() 241 var numOpened int 242 for { 243 select { 244 case <-ctx.Done(): 245 t.Fatalf("timed out, got %d session(s), want %d", numOpened, want) 246 default: 247 sp.mu.Lock() 248 numOpened = sp.idleList.Len() + sp.idleWriteList.Len() 249 sp.mu.Unlock() 250 if uint64(numOpened) == want { 251 return 252 } 253 } 254 } 255} 256 257// Test SingleUse transaction. 258func TestIntegration_SingleUse(t *testing.T) { 259 t.Parallel() 260 261 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 262 defer cancel() 263 // Set up testing environment. 264 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 265 defer cleanup() 266 267 writes := []struct { 268 row []interface{} 269 ts time.Time 270 }{ 271 {row: []interface{}{1, "Marc", "Foo"}}, 272 {row: []interface{}{2, "Tars", "Bar"}}, 273 {row: []interface{}{3, "Alpha", "Beta"}}, 274 {row: []interface{}{4, "Last", "End"}}, 275 } 276 // Try to write four rows through the Apply API. 277 for i, w := range writes { 278 var err error 279 m := InsertOrUpdate("Singers", 280 []string{"SingerId", "FirstName", "LastName"}, 281 w.row) 282 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 283 t.Fatal(err) 284 } 285 } 286 // Calculate time difference between Cloud Spanner server and localhost to 287 // use to determine the exact staleness value to use. 288 timeDiff := maxDuration(time.Now().Sub(writes[0].ts), 0) 289 290 // Test reading rows with different timestamp bounds. 291 for i, test := range []struct { 292 name string 293 want [][]interface{} 294 tb TimestampBound 295 checkTs func(time.Time) error 296 }{ 297 { 298 name: "strong", 299 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 300 tb: StrongRead(), 301 checkTs: func(ts time.Time) error { 302 // writes[3] is the last write, all subsequent strong read 303 // should have a timestamp larger than that. 304 if ts.Before(writes[3].ts) { 305 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 306 } 307 return nil 308 }, 309 }, 310 { 311 name: "min_read_timestamp", 312 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 313 tb: MinReadTimestamp(writes[3].ts), 314 checkTs: func(ts time.Time) error { 315 if ts.Before(writes[3].ts) { 316 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 317 } 318 return nil 319 }, 320 }, 321 { 322 name: "max_staleness", 323 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 324 tb: MaxStaleness(time.Second), 325 checkTs: func(ts time.Time) error { 326 if ts.Before(writes[3].ts) { 327 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 328 } 329 return nil 330 }, 331 }, 332 { 333 name: "read_timestamp", 334 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 335 tb: ReadTimestamp(writes[2].ts), 336 checkTs: func(ts time.Time) error { 337 if ts != writes[2].ts { 338 return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts) 339 } 340 return nil 341 }, 342 }, 343 { 344 name: "exact_staleness", 345 want: nil, 346 // Specify a staleness which should be already before this test. 347 tb: ExactStaleness(time.Now().Sub(writes[0].ts) + timeDiff + 30*time.Second), 348 checkTs: func(ts time.Time) error { 349 if !ts.Before(writes[0].ts) { 350 return fmt.Errorf("read got timestamp %v, want it to be earlier than %v", ts, writes[0].ts) 351 } 352 return nil 353 }, 354 }, 355 } { 356 t.Run(test.name, func(t *testing.T) { 357 // SingleUse.Query 358 su := client.Single().WithTimestampBound(test.tb) 359 got, err := readAll(su.Query( 360 ctx, 361 Statement{ 362 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)", 363 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 364 })) 365 if err != nil { 366 t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err) 367 } 368 rts, err := su.Timestamp() 369 if err != nil { 370 t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err) 371 } 372 if err := test.checkTs(rts); err != nil { 373 t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err) 374 } 375 if !testEqual(got, test.want) { 376 t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want) 377 } 378 // SingleUse.Read 379 su = client.Single().WithTimestampBound(test.tb) 380 got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 381 if err != nil { 382 t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err) 383 } 384 rts, err = su.Timestamp() 385 if err != nil { 386 t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err) 387 } 388 if err := test.checkTs(rts); err != nil { 389 t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err) 390 } 391 if !testEqual(got, test.want) { 392 t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want) 393 } 394 // SingleUse.ReadRow 395 got = nil 396 for _, k := range []Key{{1}, {3}, {4}} { 397 su = client.Single().WithTimestampBound(test.tb) 398 r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 399 if err != nil { 400 continue 401 } 402 v, err := rowToValues(r) 403 if err != nil { 404 continue 405 } 406 got = append(got, v) 407 rts, err = su.Timestamp() 408 if err != nil { 409 t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 410 } 411 if err := test.checkTs(rts); err != nil { 412 t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 413 } 414 } 415 if !testEqual(got, test.want) { 416 t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want) 417 } 418 // SingleUse.ReadUsingIndex 419 su = client.Single().WithTimestampBound(test.tb) 420 got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 421 if err != nil { 422 t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err) 423 } 424 // The results from ReadUsingIndex is sorted by the index rather than primary key. 425 if len(got) != len(test.want) { 426 t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 427 } 428 for j, g := range got { 429 if j > 0 { 430 prev := got[j-1][1].(string) + got[j-1][2].(string) 431 curr := got[j][1].(string) + got[j][2].(string) 432 if strings.Compare(prev, curr) > 0 { 433 t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 434 } 435 } 436 found := false 437 for _, w := range test.want { 438 if testEqual(g, w) { 439 found = true 440 } 441 } 442 if !found { 443 t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 444 break 445 } 446 } 447 rts, err = su.Timestamp() 448 if err != nil { 449 t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 450 } 451 if err := test.checkTs(rts); err != nil { 452 t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 453 } 454 // SingleUse.ReadRowUsingIndex 455 got = nil 456 for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} { 457 su = client.Single().WithTimestampBound(test.tb) 458 r, err := su.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"}) 459 if err != nil { 460 continue 461 } 462 v, err := rowToValues(r) 463 if err != nil { 464 continue 465 } 466 got = append(got, v) 467 rts, err = su.Timestamp() 468 if err != nil { 469 t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err) 470 } 471 if err := test.checkTs(rts); err != nil { 472 t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err) 473 } 474 } 475 if !testEqual(got, test.want) { 476 t.Fatalf("%d: got unexpected results from SingleUse.ReadRowUsingIndex: %v, want %v", i, got, test.want) 477 } 478 }) 479 } 480} 481 482// Test resource-based routing enabled. 483func TestIntegration_SingleUse_WithResourceBasedRouting(t *testing.T) { 484 os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true") 485 defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "") 486 487 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 488 defer cancel() 489 // Set up testing environment. 490 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 491 defer cleanup() 492 493 writes := []struct { 494 row []interface{} 495 ts time.Time 496 }{ 497 {row: []interface{}{1, "Marc", "Foo"}}, 498 {row: []interface{}{2, "Tars", "Bar"}}, 499 {row: []interface{}{3, "Alpha", "Beta"}}, 500 {row: []interface{}{4, "Last", "End"}}, 501 } 502 // Try to write four rows through the Apply API. 503 for i, w := range writes { 504 var err error 505 m := InsertOrUpdate("Singers", 506 []string{"SingerId", "FirstName", "LastName"}, 507 w.row) 508 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 509 t.Fatal(err) 510 } 511 } 512 513 row, err := client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"}) 514 if err != nil { 515 t.Errorf("SingleUse.ReadRow returns error %v, want nil", err) 516 } 517 var got string 518 if err := row.Column(0, &got); err != nil { 519 t.Errorf("row.Column returns error %v, want nil", err) 520 } 521 if want := "Alpha"; got != want { 522 t.Errorf("got %q, want %q", got, want) 523 } 524} 525 526func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) { 527 t.Parallel() 528 529 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 530 defer cancel() 531 // Set up testing environment. 532 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 533 defer cleanup() 534 535 writes := []struct { 536 row []interface{} 537 ts time.Time 538 }{ 539 {row: []interface{}{1, "Marc", "Foo"}}, 540 {row: []interface{}{2, "Tars", "Bar"}}, 541 {row: []interface{}{3, "Alpha", "Beta"}}, 542 {row: []interface{}{4, "Last", "End"}}, 543 } 544 // Try to write four rows through the Apply API. 545 for i, w := range writes { 546 var err error 547 m := InsertOrUpdate("Singers", 548 []string{"SingerId", "FirstName", "LastName"}, 549 w.row) 550 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 551 t.Fatal(err) 552 } 553 } 554 555 su := client.Single() 556 const limit = 1 557 gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), 558 []string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit})) 559 if err != nil { 560 t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err) 561 } 562 if got, want := len(gotRows), limit; got != want { 563 t.Errorf("got %d, want %d", got, want) 564 } 565} 566 567// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it 568// also tests for a single timestamp across multiple reads. 569func TestIntegration_ReadOnlyTransaction(t *testing.T) { 570 t.Parallel() 571 572 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 573 defer cancel() 574 // Set up testing environment. 575 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 576 defer cleanup() 577 578 writes := []struct { 579 row []interface{} 580 ts time.Time 581 }{ 582 {row: []interface{}{1, "Marc", "Foo"}}, 583 {row: []interface{}{2, "Tars", "Bar"}}, 584 {row: []interface{}{3, "Alpha", "Beta"}}, 585 {row: []interface{}{4, "Last", "End"}}, 586 } 587 // Try to write four rows through the Apply API. 588 for i, w := range writes { 589 var err error 590 m := InsertOrUpdate("Singers", 591 []string{"SingerId", "FirstName", "LastName"}, 592 w.row) 593 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 594 t.Fatal(err) 595 } 596 } 597 598 // For testing timestamp bound staleness. 599 <-time.After(time.Second) 600 601 // Test reading rows with different timestamp bounds. 602 for i, test := range []struct { 603 want [][]interface{} 604 tb TimestampBound 605 checkTs func(time.Time) error 606 }{ 607 // Note: min_read_timestamp and max_staleness are not supported by 608 // ReadOnlyTransaction. See API document for more details. 609 { 610 // strong 611 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 612 StrongRead(), 613 func(ts time.Time) error { 614 if ts.Before(writes[3].ts) { 615 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 616 } 617 return nil 618 }, 619 }, 620 { 621 // read_timestamp 622 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 623 ReadTimestamp(writes[2].ts), 624 func(ts time.Time) error { 625 if ts != writes[2].ts { 626 return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts) 627 } 628 return nil 629 }, 630 }, 631 { 632 // exact_staleness 633 nil, 634 // Specify a staleness which should be already before this test 635 // because context timeout is set to be 10s. 636 ExactStaleness(11 * time.Second), 637 func(ts time.Time) error { 638 if ts.After(writes[0].ts) { 639 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts) 640 } 641 return nil 642 }, 643 }, 644 } { 645 // ReadOnlyTransaction.Query 646 ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb) 647 got, err := readAll(ro.Query( 648 ctx, 649 Statement{ 650 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)", 651 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 652 })) 653 if err != nil { 654 t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err) 655 } 656 if !testEqual(got, test.want) { 657 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want) 658 } 659 rts, err := ro.Timestamp() 660 if err != nil { 661 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err) 662 } 663 if err := test.checkTs(rts); err != nil { 664 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err) 665 } 666 roTs := rts 667 // ReadOnlyTransaction.Read 668 got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 669 if err != nil { 670 t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err) 671 } 672 if !testEqual(got, test.want) { 673 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want) 674 } 675 rts, err = ro.Timestamp() 676 if err != nil { 677 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err) 678 } 679 if err := test.checkTs(rts); err != nil { 680 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err) 681 } 682 if roTs != rts { 683 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 684 } 685 // ReadOnlyTransaction.ReadRow 686 got = nil 687 for _, k := range []Key{{1}, {3}, {4}} { 688 r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 689 if err != nil { 690 continue 691 } 692 v, err := rowToValues(r) 693 if err != nil { 694 continue 695 } 696 got = append(got, v) 697 rts, err = ro.Timestamp() 698 if err != nil { 699 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 700 } 701 if err := test.checkTs(rts); err != nil { 702 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 703 } 704 if roTs != rts { 705 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 706 } 707 } 708 if !testEqual(got, test.want) { 709 t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want) 710 } 711 // SingleUse.ReadUsingIndex 712 got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 713 if err != nil { 714 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err) 715 } 716 // The results from ReadUsingIndex is sorted by the index rather than 717 // primary key. 718 if len(got) != len(test.want) { 719 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 720 } 721 for j, g := range got { 722 if j > 0 { 723 prev := got[j-1][1].(string) + got[j-1][2].(string) 724 curr := got[j][1].(string) + got[j][2].(string) 725 if strings.Compare(prev, curr) > 0 { 726 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 727 } 728 } 729 found := false 730 for _, w := range test.want { 731 if testEqual(g, w) { 732 found = true 733 } 734 } 735 if !found { 736 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 737 break 738 } 739 } 740 rts, err = ro.Timestamp() 741 if err != nil { 742 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 743 } 744 if err := test.checkTs(rts); err != nil { 745 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 746 } 747 if roTs != rts { 748 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 749 } 750 // ReadOnlyTransaction.ReadRowUsingIndex 751 got = nil 752 for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} { 753 r, err := ro.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"}) 754 if err != nil { 755 continue 756 } 757 v, err := rowToValues(r) 758 if err != nil { 759 continue 760 } 761 got = append(got, v) 762 rts, err = ro.Timestamp() 763 if err != nil { 764 t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err) 765 } 766 if err := test.checkTs(rts); err != nil { 767 t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err) 768 } 769 if roTs != rts { 770 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 771 } 772 } 773 if !testEqual(got, test.want) { 774 t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRowUsingIndex: %v, want %v", i, got, test.want) 775 } 776 ro.Close() 777 } 778} 779 780// Test ReadOnlyTransaction with different timestamp bound when there's an 781// update at the same time. 782func TestIntegration_UpdateDuringRead(t *testing.T) { 783 t.Parallel() 784 785 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 786 defer cancel() 787 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 788 defer cleanup() 789 790 for i, tb := range []TimestampBound{ 791 StrongRead(), 792 ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour 793 ExactStaleness(time.Minute * 30), 794 } { 795 ro := client.ReadOnlyTransaction().WithTimestampBound(tb) 796 _, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 797 if ErrCode(err) != codes.NotFound { 798 t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err) 799 } 800 801 m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i}) 802 if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 803 t.Fatal(err) 804 } 805 806 _, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 807 if ErrCode(err) != codes.NotFound { 808 t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err) 809 } 810 } 811} 812 813// Test ReadWriteTransaction. 814func TestIntegration_ReadWriteTransaction(t *testing.T) { 815 t.Parallel() 816 817 // Give a longer deadline because of transaction backoffs. 818 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 819 defer cancel() 820 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 821 defer cleanup() 822 823 // Set up two accounts 824 accounts := []*Mutation{ 825 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 826 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 827 } 828 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 829 t.Fatal(err) 830 } 831 wg := sync.WaitGroup{} 832 833 readBalance := func(iter *RowIterator) (int64, error) { 834 defer iter.Stop() 835 var bal int64 836 for { 837 row, err := iter.Next() 838 if err == iterator.Done { 839 return bal, nil 840 } 841 if err != nil { 842 return 0, err 843 } 844 if err := row.Column(0, &bal); err != nil { 845 return 0, err 846 } 847 } 848 } 849 850 for i := 0; i < 20; i++ { 851 wg.Add(1) 852 go func(iter int) { 853 defer wg.Done() 854 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 855 // Query Foo's balance and Bar's balance. 856 bf, e := readBalance(tx.Query(ctx, 857 Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}})) 858 if e != nil { 859 return e 860 } 861 bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"})) 862 if e != nil { 863 return e 864 } 865 if bf <= 0 { 866 return nil 867 } 868 bf-- 869 bb++ 870 return tx.BufferWrite([]*Mutation{ 871 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}), 872 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}), 873 }) 874 }) 875 if err != nil { 876 t.Errorf("%d: failed to execute transaction: %v", iter, err) 877 } 878 }(i) 879 } 880 // Because of context timeout, all goroutines will eventually return. 881 wg.Wait() 882 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 883 var bf, bb int64 884 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"}) 885 if e != nil { 886 return e 887 } 888 if ce := r.Column(0, &bf); ce != nil { 889 return ce 890 } 891 bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"})) 892 if e != nil { 893 return e 894 } 895 if bf != 30 || bb != 21 { 896 t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21) 897 } 898 return nil 899 }) 900 if err != nil { 901 t.Errorf("failed to check balances: %v", err) 902 } 903} 904 905func TestIntegration_Reads(t *testing.T) { 906 t.Parallel() 907 908 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 909 defer cancel() 910 // Set up testing environment. 911 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 912 defer cleanup() 913 914 // Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2". 915 var ms []*Mutation 916 for i := 0; i < 15; i++ { 917 ms = append(ms, InsertOrUpdate(testTable, 918 testTableColumns, 919 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 920 } 921 // Don't use ApplyAtLeastOnce, so we can test the other code path. 922 if _, err := client.Apply(ctx, ms); err != nil { 923 t.Fatal(err) 924 } 925 926 // Empty read. 927 rows, err := readAllTestTable(client.Single().Read(ctx, testTable, 928 KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns)) 929 if err != nil { 930 t.Fatal(err) 931 } 932 if got, want := len(rows), 0; got != want { 933 t.Errorf("got %d, want %d", got, want) 934 } 935 936 // Index empty read. 937 rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, 938 KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns)) 939 if err != nil { 940 t.Fatal(err) 941 } 942 if got, want := len(rows), 0; got != want { 943 t.Errorf("got %d, want %d", got, want) 944 } 945 946 // Point read. 947 row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns) 948 if err != nil { 949 t.Fatal(err) 950 } 951 var got testTableRow 952 if err := row.ToStruct(&got); err != nil { 953 t.Fatal(err) 954 } 955 if want := (testTableRow{"k1", "v1"}); got != want { 956 t.Errorf("got %v, want %v", got, want) 957 } 958 959 // Point read not found. 960 _, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns) 961 if ErrCode(err) != codes.NotFound { 962 t.Fatalf("got %v, want NotFound", err) 963 } 964 965 // Index point read. 966 rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns) 967 if err != nil { 968 t.Fatal(err) 969 } 970 var gotIndex testTableRow 971 if err := rowIndex.ToStruct(&gotIndex); err != nil { 972 t.Fatal(err) 973 } 974 if wantIndex := (testTableRow{"k1", "v1"}); gotIndex != wantIndex { 975 t.Errorf("got %v, want %v", gotIndex, wantIndex) 976 } 977 // Index point read not found. 978 _, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v999"}, testTableColumns) 979 if ErrCode(err) != codes.NotFound { 980 t.Fatalf("got %v, want NotFound", err) 981 } 982 rangeReads(ctx, t, client) 983 indexRangeReads(ctx, t, client) 984} 985 986func TestIntegration_EarlyTimestamp(t *testing.T) { 987 t.Parallel() 988 989 // Test that we can get the timestamp from a read-only transaction as 990 // soon as we have read at least one row. 991 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 992 defer cancel() 993 // Set up testing environment. 994 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 995 defer cleanup() 996 997 var ms []*Mutation 998 for i := 0; i < 3; i++ { 999 ms = append(ms, InsertOrUpdate(testTable, 1000 testTableColumns, 1001 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 1002 } 1003 if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil { 1004 t.Fatal(err) 1005 } 1006 1007 txn := client.Single() 1008 iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns) 1009 defer iter.Stop() 1010 // In single-use transaction, we should get an error before reading anything. 1011 if _, err := txn.Timestamp(); err == nil { 1012 t.Error("wanted error, got nil") 1013 } 1014 // After reading one row, the timestamp should be available. 1015 _, err := iter.Next() 1016 if err != nil { 1017 t.Fatal(err) 1018 } 1019 if _, err := txn.Timestamp(); err != nil { 1020 t.Errorf("got %v, want nil", err) 1021 } 1022 1023 txn = client.ReadOnlyTransaction() 1024 defer txn.Close() 1025 iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns) 1026 defer iter.Stop() 1027 // In an ordinary read-only transaction, the timestamp should be 1028 // available immediately. 1029 if _, err := txn.Timestamp(); err != nil { 1030 t.Errorf("got %v, want nil", err) 1031 } 1032} 1033 1034func TestIntegration_NestedTransaction(t *testing.T) { 1035 t.Parallel() 1036 1037 // You cannot use a transaction from inside a read-write transaction. 1038 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1039 defer cancel() 1040 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1041 defer cleanup() 1042 1043 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1044 _, err := client.ReadWriteTransaction(ctx, 1045 func(context.Context, *ReadWriteTransaction) error { return nil }) 1046 if ErrCode(err) != codes.FailedPrecondition { 1047 t.Fatalf("got %v, want FailedPrecondition", err) 1048 } 1049 _, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 1050 if ErrCode(err) != codes.FailedPrecondition { 1051 t.Fatalf("got %v, want FailedPrecondition", err) 1052 } 1053 rot := client.ReadOnlyTransaction() 1054 defer rot.Close() 1055 _, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 1056 if ErrCode(err) != codes.FailedPrecondition { 1057 t.Fatalf("got %v, want FailedPrecondition", err) 1058 } 1059 return nil 1060 }) 1061 if err != nil { 1062 t.Fatal(err) 1063 } 1064} 1065 1066// Test client recovery on database recreation. 1067func TestIntegration_DbRemovalRecovery(t *testing.T) { 1068 t.Parallel() 1069 1070 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1071 defer cancel() 1072 // Create a client with MinOpened=0 to prevent the session pool maintainer 1073 // from repeatedly trying to create sessions for the invalid database. 1074 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, SessionPoolConfig{}, singerDBStatements) 1075 defer cleanup() 1076 1077 // Drop the testing database. 1078 if err := databaseAdmin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil { 1079 t.Fatalf("failed to drop testing database %v: %v", dbPath, err) 1080 } 1081 1082 // Now, send the query. 1083 iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 1084 defer iter.Stop() 1085 if _, err := iter.Next(); err == nil { 1086 t.Errorf("client sends query to removed database successfully, want it to fail") 1087 } 1088 1089 // Recreate database and table. 1090 dbName := dbPath[strings.LastIndex(dbPath, "/")+1:] 1091 op, err := databaseAdmin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{ 1092 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 1093 CreateStatement: "CREATE DATABASE " + dbName, 1094 ExtraStatements: []string{ 1095 `CREATE TABLE Singers ( 1096 SingerId INT64 NOT NULL, 1097 FirstName STRING(1024), 1098 LastName STRING(1024), 1099 SingerInfo BYTES(MAX) 1100 ) PRIMARY KEY (SingerId)`, 1101 }, 1102 }) 1103 if err != nil { 1104 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 1105 } 1106 if _, err := op.Wait(ctx); err != nil { 1107 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 1108 } 1109 1110 // Now, send the query again. 1111 iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 1112 defer iter.Stop() 1113 _, err = iter.Next() 1114 if err != nil && err != iterator.Done { 1115 t.Errorf("failed to send query to database %v: %v", dbPath, err) 1116 } 1117} 1118 1119// Test encoding/decoding non-struct Cloud Spanner types. 1120func TestIntegration_BasicTypes(t *testing.T) { 1121 t.Parallel() 1122 1123 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1124 defer cancel() 1125 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1126 defer cleanup() 1127 1128 t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z") 1129 // Boundaries 1130 t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z") 1131 t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z") 1132 d1, _ := civil.ParseDate("2016-11-15") 1133 // Boundaries 1134 d2, _ := civil.ParseDate("0001-01-01") 1135 d3, _ := civil.ParseDate("9999-12-31") 1136 1137 tests := []struct { 1138 col string 1139 val interface{} 1140 want interface{} 1141 }{ 1142 {col: "String", val: ""}, 1143 {col: "String", val: "", want: NullString{"", true}}, 1144 {col: "String", val: "foo"}, 1145 {col: "String", val: "foo", want: NullString{"foo", true}}, 1146 {col: "String", val: NullString{"bar", true}, want: "bar"}, 1147 {col: "String", val: NullString{"bar", false}, want: NullString{"", false}}, 1148 {col: "String", val: nil, want: NullString{}}, 1149 {col: "StringArray", val: []string(nil), want: []NullString(nil)}, 1150 {col: "StringArray", val: []string{}, want: []NullString{}}, 1151 {col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}}, 1152 {col: "StringArray", val: []NullString(nil)}, 1153 {col: "StringArray", val: []NullString{}}, 1154 {col: "StringArray", val: []NullString{{"foo", true}, {}}}, 1155 {col: "Bytes", val: []byte{}}, 1156 {col: "Bytes", val: []byte{1, 2, 3}}, 1157 {col: "Bytes", val: []byte(nil)}, 1158 {col: "BytesArray", val: [][]byte(nil)}, 1159 {col: "BytesArray", val: [][]byte{}}, 1160 {col: "BytesArray", val: [][]byte{{1}, {2, 3}}}, 1161 {col: "Int64a", val: 0, want: int64(0)}, 1162 {col: "Int64a", val: -1, want: int64(-1)}, 1163 {col: "Int64a", val: 2, want: int64(2)}, 1164 {col: "Int64a", val: int64(3)}, 1165 {col: "Int64a", val: 4, want: NullInt64{4, true}}, 1166 {col: "Int64a", val: NullInt64{5, true}, want: int64(5)}, 1167 {col: "Int64a", val: NullInt64{6, true}, want: int64(6)}, 1168 {col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}}, 1169 {col: "Int64a", val: nil, want: NullInt64{}}, 1170 {col: "Int64Array", val: []int(nil), want: []NullInt64(nil)}, 1171 {col: "Int64Array", val: []int{}, want: []NullInt64{}}, 1172 {col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 1173 {col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)}, 1174 {col: "Int64Array", val: []int64{}, want: []NullInt64{}}, 1175 {col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 1176 {col: "Int64Array", val: []NullInt64(nil)}, 1177 {col: "Int64Array", val: []NullInt64{}}, 1178 {col: "Int64Array", val: []NullInt64{{1, true}, {}}}, 1179 {col: "Bool", val: false}, 1180 {col: "Bool", val: true}, 1181 {col: "Bool", val: false, want: NullBool{false, true}}, 1182 {col: "Bool", val: true, want: NullBool{true, true}}, 1183 {col: "Bool", val: NullBool{true, true}}, 1184 {col: "Bool", val: NullBool{false, false}}, 1185 {col: "Bool", val: nil, want: NullBool{}}, 1186 {col: "BoolArray", val: []bool(nil), want: []NullBool(nil)}, 1187 {col: "BoolArray", val: []bool{}, want: []NullBool{}}, 1188 {col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}}, 1189 {col: "BoolArray", val: []NullBool(nil)}, 1190 {col: "BoolArray", val: []NullBool{}}, 1191 {col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}}, 1192 {col: "Float64", val: 0.0}, 1193 {col: "Float64", val: 3.14}, 1194 {col: "Float64", val: math.NaN()}, 1195 {col: "Float64", val: math.Inf(1)}, 1196 {col: "Float64", val: math.Inf(-1)}, 1197 {col: "Float64", val: 2.78, want: NullFloat64{2.78, true}}, 1198 {col: "Float64", val: NullFloat64{2.71, true}, want: 2.71}, 1199 {col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}}, 1200 {col: "Float64", val: NullFloat64{0, false}}, 1201 {col: "Float64", val: nil, want: NullFloat64{}}, 1202 {col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)}, 1203 {col: "Float64Array", val: []float64{}, want: []NullFloat64{}}, 1204 {col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}}, 1205 {col: "Float64Array", val: []NullFloat64(nil)}, 1206 {col: "Float64Array", val: []NullFloat64{}}, 1207 {col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}}, 1208 {col: "Date", val: d1}, 1209 {col: "Date", val: d1, want: NullDate{d1, true}}, 1210 {col: "Date", val: NullDate{d1, true}}, 1211 {col: "Date", val: NullDate{d1, true}, want: d1}, 1212 {col: "Date", val: NullDate{civil.Date{}, false}}, 1213 {col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)}, 1214 {col: "DateArray", val: []civil.Date{}, want: []NullDate{}}, 1215 {col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}}, 1216 {col: "Timestamp", val: t1}, 1217 {col: "Timestamp", val: t1, want: NullTime{t1, true}}, 1218 {col: "Timestamp", val: NullTime{t1, true}}, 1219 {col: "Timestamp", val: NullTime{t1, true}, want: t1}, 1220 {col: "Timestamp", val: NullTime{}}, 1221 {col: "Timestamp", val: nil, want: NullTime{}}, 1222 {col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)}, 1223 {col: "TimestampArray", val: []time.Time{}, want: []NullTime{}}, 1224 {col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}}, 1225 } 1226 1227 // Write rows into table first. 1228 var muts []*Mutation 1229 for i, test := range tests { 1230 muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val})) 1231 } 1232 if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil { 1233 t.Fatal(err) 1234 } 1235 1236 for i, test := range tests { 1237 row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col}) 1238 if err != nil { 1239 t.Fatalf("Unable to fetch row %v: %v", i, err) 1240 } 1241 // Create new instance of type of test.want. 1242 want := test.want 1243 if want == nil { 1244 want = test.val 1245 } 1246 gotp := reflect.New(reflect.TypeOf(want)) 1247 if err := row.Column(0, gotp.Interface()); err != nil { 1248 t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err) 1249 continue 1250 } 1251 got := reflect.Indirect(gotp).Interface() 1252 1253 // One of the test cases is checking NaN handling. Given 1254 // NaN!=NaN, we can't use reflect to test for it. 1255 if isNaN(got) && isNaN(want) { 1256 continue 1257 } 1258 1259 // Check non-NaN cases. 1260 if !testEqual(got, want) { 1261 t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want) 1262 continue 1263 } 1264 } 1265} 1266 1267// Test decoding Cloud Spanner STRUCT type. 1268func TestIntegration_StructTypes(t *testing.T) { 1269 t.Parallel() 1270 1271 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1272 defer cancel() 1273 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1274 defer cleanup() 1275 1276 tests := []struct { 1277 q Statement 1278 want func(r *Row) error 1279 }{ 1280 { 1281 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`}, 1282 want: func(r *Row) error { 1283 // Test STRUCT ARRAY decoding to []NullRow. 1284 var rows []NullRow 1285 if err := r.Column(0, &rows); err != nil { 1286 return err 1287 } 1288 if len(rows) != 1 { 1289 return fmt.Errorf("len(rows) = %d; want 1", len(rows)) 1290 } 1291 if !rows[0].Valid { 1292 return fmt.Errorf("rows[0] is NULL") 1293 } 1294 var i, j int64 1295 if err := rows[0].Row.Columns(&i, &j); err != nil { 1296 return err 1297 } 1298 if i != 1 || j != 2 { 1299 return fmt.Errorf("got (%d,%d), want (1,2)", i, j) 1300 } 1301 return nil 1302 }, 1303 }, 1304 { 1305 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`}, 1306 want: func(r *Row) error { 1307 // Test Row.ToStruct. 1308 s := struct { 1309 Col1 []*struct { 1310 Foo int64 `spanner:"foo"` 1311 Bar int64 `spanner:"bar"` 1312 } `spanner:"col1"` 1313 }{} 1314 if err := r.ToStruct(&s); err != nil { 1315 return err 1316 } 1317 want := struct { 1318 Col1 []*struct { 1319 Foo int64 `spanner:"foo"` 1320 Bar int64 `spanner:"bar"` 1321 } `spanner:"col1"` 1322 }{ 1323 Col1: []*struct { 1324 Foo int64 `spanner:"foo"` 1325 Bar int64 `spanner:"bar"` 1326 }{ 1327 { 1328 Foo: 1, 1329 Bar: 2, 1330 }, 1331 }, 1332 } 1333 if !testEqual(want, s) { 1334 return fmt.Errorf("unexpected decoding result: %v, want %v", s, want) 1335 } 1336 return nil 1337 }, 1338 }, 1339 } 1340 for i, test := range tests { 1341 iter := client.Single().Query(ctx, test.q) 1342 defer iter.Stop() 1343 row, err := iter.Next() 1344 if err != nil { 1345 t.Errorf("%d: %v", i, err) 1346 continue 1347 } 1348 if err := test.want(row); err != nil { 1349 t.Errorf("%d: %v", i, err) 1350 continue 1351 } 1352 } 1353} 1354 1355func TestIntegration_StructParametersUnsupported(t *testing.T) { 1356 skipEmulatorTest(t) 1357 t.Parallel() 1358 1359 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1360 defer cancel() 1361 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 1362 defer cleanup() 1363 1364 for _, test := range []struct { 1365 param interface{} 1366 wantCode codes.Code 1367 wantMsgPart string 1368 }{ 1369 { 1370 struct { 1371 Field int 1372 }{10}, 1373 codes.Unimplemented, 1374 "Unsupported query shape: " + 1375 "A struct value cannot be returned as a column value. " + 1376 "Rewrite the query to flatten the struct fields in the result.", 1377 }, 1378 { 1379 []struct { 1380 Field int 1381 }{{10}, {20}}, 1382 codes.Unimplemented, 1383 "Unsupported query shape: " + 1384 "This query can return a null-valued array of struct, " + 1385 "which is not supported by Spanner.", 1386 }, 1387 } { 1388 iter := client.Single().Query(ctx, Statement{ 1389 SQL: "SELECT @p", 1390 Params: map[string]interface{}{"p": test.param}, 1391 }) 1392 _, err := iter.Next() 1393 iter.Stop() 1394 if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok { 1395 t.Fatal(msg) 1396 } 1397 } 1398} 1399 1400// Test queries of the form "SELECT expr". 1401func TestIntegration_QueryExpressions(t *testing.T) { 1402 t.Parallel() 1403 1404 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1405 defer cancel() 1406 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 1407 defer cleanup() 1408 1409 newRow := func(vals []interface{}) *Row { 1410 row, err := NewRow(make([]string, len(vals)), vals) 1411 if err != nil { 1412 t.Fatal(err) 1413 } 1414 return row 1415 } 1416 1417 tests := []struct { 1418 expr string 1419 want interface{} 1420 }{ 1421 {"1", int64(1)}, 1422 {"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}}, 1423 {"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}}, 1424 {"IEEE_DIVIDE(1, 0)", math.Inf(1)}, 1425 {"IEEE_DIVIDE(-1, 0)", math.Inf(-1)}, 1426 {"IEEE_DIVIDE(0, 0)", math.NaN()}, 1427 // TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate. 1428 {"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}}, 1429 {"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}}, 1430 {"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}}, 1431 } 1432 for _, test := range tests { 1433 iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr}) 1434 defer iter.Stop() 1435 row, err := iter.Next() 1436 if err != nil { 1437 t.Errorf("%q: %v", test.expr, err) 1438 continue 1439 } 1440 // Create new instance of type of test.want. 1441 gotp := reflect.New(reflect.TypeOf(test.want)) 1442 if err := row.Column(0, gotp.Interface()); err != nil { 1443 t.Errorf("%q: Column returned error %v", test.expr, err) 1444 continue 1445 } 1446 got := reflect.Indirect(gotp).Interface() 1447 // TODO(jba): remove isNaN special case when we have a better equality predicate. 1448 if isNaN(got) && isNaN(test.want) { 1449 continue 1450 } 1451 if !testEqual(got, test.want) { 1452 t.Errorf("%q\n got %#v\nwant %#v", test.expr, got, test.want) 1453 } 1454 } 1455} 1456 1457func TestIntegration_QueryStats(t *testing.T) { 1458 skipEmulatorTest(t) 1459 t.Parallel() 1460 1461 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1462 defer cancel() 1463 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1464 defer cleanup() 1465 1466 accounts := []*Mutation{ 1467 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1468 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1469 } 1470 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1471 t.Fatal(err) 1472 } 1473 const sql = "SELECT Balance FROM Accounts" 1474 1475 qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil}) 1476 if err != nil { 1477 t.Fatal(err) 1478 } 1479 if len(qp.PlanNodes) == 0 { 1480 t.Error("got zero plan nodes, expected at least one") 1481 } 1482 1483 iter := client.Single().QueryWithStats(ctx, Statement{sql, nil}) 1484 defer iter.Stop() 1485 for { 1486 _, err := iter.Next() 1487 if err == iterator.Done { 1488 break 1489 } 1490 if err != nil { 1491 t.Fatal(err) 1492 } 1493 } 1494 if iter.QueryPlan == nil { 1495 t.Error("got nil QueryPlan, expected one") 1496 } 1497 if iter.QueryStats == nil { 1498 t.Error("got nil QueryStats, expected some") 1499 } 1500} 1501 1502func TestIntegration_InvalidDatabase(t *testing.T) { 1503 t.Parallel() 1504 1505 if databaseAdmin == nil { 1506 t.Skip("Integration tests skipped") 1507 } 1508 ctx := context.Background() 1509 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID) 1510 c, err := createClient(ctx, dbPath, SessionPoolConfig{}) 1511 // Client creation should succeed even if the database is invalid. 1512 if err != nil { 1513 t.Fatal(err) 1514 } 1515 _, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"}) 1516 if msg, ok := matchError(err, codes.NotFound, ""); !ok { 1517 t.Fatal(msg) 1518 } 1519} 1520 1521func TestIntegration_ReadErrors(t *testing.T) { 1522 t.Parallel() 1523 1524 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1525 defer cancel() 1526 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 1527 defer cleanup() 1528 1529 var ms []*Mutation 1530 for i := 0; i < 2; i++ { 1531 ms = append(ms, InsertOrUpdate(testTable, 1532 testTableColumns, 1533 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v")})) 1534 } 1535 if _, err := client.Apply(ctx, ms); err != nil { 1536 t.Fatal(err) 1537 } 1538 1539 // Read over invalid table fails 1540 _, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"}) 1541 if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok { 1542 t.Error(msg) 1543 } 1544 // Read over invalid column fails 1545 _, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"}) 1546 if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok { 1547 t.Error(msg) 1548 } 1549 1550 // Invalid query fails 1551 iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"}) 1552 defer iter.Stop() 1553 _, err = iter.Next() 1554 if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok { 1555 t.Error(msg) 1556 } 1557 1558 // Read should fail on cancellation. 1559 cctx, cancel := context.WithCancel(ctx) 1560 cancel() 1561 _, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"}) 1562 if msg, ok := matchError(err, codes.Canceled, ""); !ok { 1563 t.Error(msg) 1564 } 1565 // Read should fail if deadline exceeded. 1566 dctx, cancel := context.WithTimeout(ctx, time.Nanosecond) 1567 defer cancel() 1568 <-dctx.Done() 1569 _, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"}) 1570 if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok { 1571 t.Error(msg) 1572 } 1573 // Read should fail if there are multiple rows returned. 1574 _, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v"}, testTableColumns) 1575 wantMsgPart := fmt.Sprintf("more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", testTable, Key{"v"}, testTableIndex) 1576 if msg, ok := matchError(err, codes.FailedPrecondition, wantMsgPart); !ok { 1577 t.Error(msg) 1578 } 1579} 1580 1581// Test TransactionRunner. Test that transactions are aborted and retried as 1582// expected. 1583func TestIntegration_TransactionRunner(t *testing.T) { 1584 skipEmulatorTest(t) 1585 t.Parallel() 1586 1587 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1588 defer cancel() 1589 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1590 defer cleanup() 1591 1592 // Test 1: User error should abort the transaction. 1593 _, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1594 tx.BufferWrite([]*Mutation{ 1595 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})}) 1596 return errors.New("user error") 1597 }) 1598 // Empty read. 1599 rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"})) 1600 if err != nil { 1601 t.Fatal(err) 1602 } 1603 if got, want := len(rows), 0; got != want { 1604 t.Errorf("Empty read, got %d, want %d.", got, want) 1605 } 1606 1607 // Test 2: Expect abort and retry. 1608 // We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by 1609 // committing writes to the column txn2 have read, and expect the following 1610 // read to abort and txn2 retries. 1611 1612 // Set up two accounts 1613 accounts := []*Mutation{ 1614 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}), 1615 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}), 1616 } 1617 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1618 t.Fatal(err) 1619 } 1620 1621 var ( 1622 cTxn1Start = make(chan struct{}) 1623 cTxn1Commit = make(chan struct{}) 1624 cTxn2Start = make(chan struct{}) 1625 wg sync.WaitGroup 1626 ) 1627 1628 // read balance, check error if we don't expect abort. 1629 readBalance := func(tx interface { 1630 ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) 1631 }, key int64, expectAbort bool) (int64, error) { 1632 var b int64 1633 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"}) 1634 if e != nil { 1635 if expectAbort && !isAbortErr(e) { 1636 t.Errorf("ReadRow got %v, want Abort error.", e) 1637 } 1638 return b, e 1639 } 1640 if ce := r.Column(0, &b); ce != nil { 1641 return b, ce 1642 } 1643 return b, nil 1644 } 1645 1646 wg.Add(2) 1647 // Txn 1 1648 go func() { 1649 defer wg.Done() 1650 var once sync.Once 1651 _, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1652 b, e := readBalance(tx, 1, false) 1653 if e != nil { 1654 return e 1655 } 1656 // txn 1 can abort, in that case we skip closing the channel on 1657 // retry. 1658 once.Do(func() { close(cTxn1Start) }) 1659 e = tx.BufferWrite([]*Mutation{ 1660 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})}) 1661 if e != nil { 1662 return e 1663 } 1664 // Wait for second transaction. 1665 <-cTxn2Start 1666 return nil 1667 }) 1668 close(cTxn1Commit) 1669 if e != nil { 1670 t.Errorf("Transaction 1 commit, got %v, want nil.", e) 1671 } 1672 }() 1673 // Txn 2 1674 go func() { 1675 // Wait until txn 1 starts. 1676 <-cTxn1Start 1677 defer wg.Done() 1678 var ( 1679 once sync.Once 1680 b1 int64 1681 b2 int64 1682 e error 1683 ) 1684 _, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1685 if b1, e = readBalance(tx, 1, false); e != nil { 1686 return e 1687 } 1688 // Skip closing channel on retry. 1689 once.Do(func() { close(cTxn2Start) }) 1690 // Wait until txn 1 successfully commits. 1691 <-cTxn1Commit 1692 // Txn1 has committed and written a balance to the account. Now this 1693 // transaction (txn2) reads and re-writes the balance. The first 1694 // time through, it will abort because it overlaps with txn1. Then 1695 // it will retry after txn1 commits, and succeed. 1696 if b2, e = readBalance(tx, 2, true); e != nil { 1697 return e 1698 } 1699 return tx.BufferWrite([]*Mutation{ 1700 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})}) 1701 }) 1702 if e != nil { 1703 t.Errorf("Transaction 2 commit, got %v, want nil.", e) 1704 } 1705 }() 1706 wg.Wait() 1707 // Check that both transactions' effects are visible. 1708 for i := int64(1); i <= int64(2); i++ { 1709 if b, e := readBalance(client.Single(), i, false); e != nil { 1710 t.Fatalf("ReadBalance for key %d error %v.", i, e) 1711 } else if b != i { 1712 t.Errorf("Balance for key %d, got %d, want %d.", i, b, i) 1713 } 1714 } 1715} 1716 1717// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then 1718// serialize and deserialize both transaction and partition to be used in 1719// execution on another client, and compare results. 1720func TestIntegration_BatchQuery(t *testing.T) { 1721 skipEmulatorTest(t) 1722 t.Parallel() 1723 1724 // Set up testing environment. 1725 var ( 1726 client2 *Client 1727 err error 1728 ) 1729 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1730 defer cancel() 1731 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 1732 defer cleanup() 1733 1734 if err = populate(ctx, client); err != nil { 1735 t.Fatal(err) 1736 } 1737 if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil { 1738 t.Fatal(err) 1739 } 1740 defer client2.Close() 1741 1742 // PartitionQuery 1743 var ( 1744 txn *BatchReadOnlyTransaction 1745 partitions []*Partition 1746 stmt = Statement{SQL: "SELECT * FROM test;"} 1747 ) 1748 1749 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1750 t.Fatal(err) 1751 } 1752 defer txn.Cleanup(ctx) 1753 if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil { 1754 t.Fatal(err) 1755 } 1756 1757 // Reconstruct BatchReadOnlyTransactionID and execute partitions 1758 var ( 1759 tid2 BatchReadOnlyTransactionID 1760 data []byte 1761 gotResult bool // if we get matching result from two separate txns 1762 ) 1763 if data, err = txn.ID.MarshalBinary(); err != nil { 1764 t.Fatalf("encoding failed %v", err) 1765 } 1766 if err = tid2.UnmarshalBinary(data); err != nil { 1767 t.Fatalf("decoding failed %v", err) 1768 } 1769 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 1770 1771 // Execute Partitions and compare results 1772 for i, p := range partitions { 1773 iter := txn.Execute(ctx, p) 1774 defer iter.Stop() 1775 p2 := serdesPartition(t, i, p) 1776 iter2 := txn2.Execute(ctx, &p2) 1777 defer iter2.Stop() 1778 1779 row1, err1 := iter.Next() 1780 row2, err2 := iter2.Next() 1781 if err1 != err2 { 1782 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 1783 continue 1784 } 1785 if !testEqual(row1, row2) { 1786 t.Fatalf("execution returned different values: %v, %v", row1, row2) 1787 continue 1788 } 1789 if row1 == nil { 1790 continue 1791 } 1792 var a, b string 1793 if err = row1.Columns(&a, &b); err != nil { 1794 t.Fatalf("failed to parse row %v", err) 1795 continue 1796 } 1797 if a == str1 && b == str2 { 1798 gotResult = true 1799 } 1800 } 1801 if !gotResult { 1802 t.Fatalf("execution didn't return expected values") 1803 } 1804} 1805 1806// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery 1807func TestIntegration_BatchRead(t *testing.T) { 1808 skipEmulatorTest(t) 1809 t.Parallel() 1810 1811 // Set up testing environment. 1812 var ( 1813 client2 *Client 1814 err error 1815 ) 1816 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1817 defer cancel() 1818 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 1819 defer cleanup() 1820 1821 if err = populate(ctx, client); err != nil { 1822 t.Fatal(err) 1823 } 1824 if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil { 1825 t.Fatal(err) 1826 } 1827 defer client2.Close() 1828 1829 // PartitionRead 1830 var ( 1831 txn *BatchReadOnlyTransaction 1832 partitions []*Partition 1833 ) 1834 1835 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1836 t.Fatal(err) 1837 } 1838 defer txn.Cleanup(ctx) 1839 if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 1840 t.Fatal(err) 1841 } 1842 1843 // Reconstruct BatchReadOnlyTransactionID and execute partitions. 1844 var ( 1845 tid2 BatchReadOnlyTransactionID 1846 data []byte 1847 gotResult bool // if we get matching result from two separate txns 1848 ) 1849 if data, err = txn.ID.MarshalBinary(); err != nil { 1850 t.Fatalf("encoding failed %v", err) 1851 } 1852 if err = tid2.UnmarshalBinary(data); err != nil { 1853 t.Fatalf("decoding failed %v", err) 1854 } 1855 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 1856 1857 // Execute Partitions and compare results. 1858 for i, p := range partitions { 1859 iter := txn.Execute(ctx, p) 1860 defer iter.Stop() 1861 p2 := serdesPartition(t, i, p) 1862 iter2 := txn2.Execute(ctx, &p2) 1863 defer iter2.Stop() 1864 1865 row1, err1 := iter.Next() 1866 row2, err2 := iter2.Next() 1867 if err1 != err2 { 1868 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 1869 continue 1870 } 1871 if !testEqual(row1, row2) { 1872 t.Fatalf("execution returned different values: %v, %v", row1, row2) 1873 continue 1874 } 1875 if row1 == nil { 1876 continue 1877 } 1878 var a, b string 1879 if err = row1.Columns(&a, &b); err != nil { 1880 t.Fatalf("failed to parse row %v", err) 1881 continue 1882 } 1883 if a == str1 && b == str2 { 1884 gotResult = true 1885 } 1886 } 1887 if !gotResult { 1888 t.Fatalf("execution didn't return expected values") 1889 } 1890} 1891 1892// Test normal txReadEnv method on BatchReadOnlyTransaction. 1893func TestIntegration_BROTNormal(t *testing.T) { 1894 skipEmulatorTest(t) 1895 t.Parallel() 1896 1897 // Set up testing environment and create txn. 1898 var ( 1899 txn *BatchReadOnlyTransaction 1900 err error 1901 row *Row 1902 i int64 1903 ) 1904 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1905 defer cancel() 1906 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 1907 defer cleanup() 1908 1909 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1910 t.Fatal(err) 1911 } 1912 defer txn.Cleanup(ctx) 1913 if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 1914 t.Fatal(err) 1915 } 1916 // Normal query should work with BatchReadOnlyTransaction. 1917 stmt2 := Statement{SQL: "SELECT 1"} 1918 iter := txn.Query(ctx, stmt2) 1919 defer iter.Stop() 1920 1921 row, err = iter.Next() 1922 if err != nil { 1923 t.Errorf("query failed with %v", err) 1924 } 1925 if err = row.Columns(&i); err != nil { 1926 t.Errorf("failed to parse row %v", err) 1927 } 1928} 1929 1930func TestIntegration_CommitTimestamp(t *testing.T) { 1931 t.Parallel() 1932 1933 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1934 defer cancel() 1935 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, ctsDBStatements) 1936 defer cleanup() 1937 1938 type testTableRow struct { 1939 Key string 1940 Ts NullTime 1941 } 1942 1943 var ( 1944 cts1, cts2, ts1, ts2 time.Time 1945 err error 1946 ) 1947 1948 // Apply mutation in sequence, expect to see commit timestamp in good order, 1949 // check also the commit timestamp returned 1950 for _, it := range []struct { 1951 k string 1952 t *time.Time 1953 }{ 1954 {"a", &cts1}, 1955 {"b", &cts2}, 1956 } { 1957 tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}} 1958 m, err := InsertStruct("TestTable", tt) 1959 if err != nil { 1960 t.Fatal(err) 1961 } 1962 *it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()) 1963 if err != nil { 1964 t.Fatal(err) 1965 } 1966 } 1967 1968 txn := client.ReadOnlyTransaction() 1969 for _, it := range []struct { 1970 k string 1971 t *time.Time 1972 }{ 1973 {"a", &ts1}, 1974 {"b", &ts2}, 1975 } { 1976 if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil { 1977 t.Fatal(err) 1978 } else { 1979 var got testTableRow 1980 if err := r.ToStruct(&got); err != nil { 1981 t.Fatal(err) 1982 } 1983 *it.t = got.Ts.Time 1984 } 1985 } 1986 if !cts1.Equal(ts1) { 1987 t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1) 1988 } 1989 if !cts2.Equal(ts2) { 1990 t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2) 1991 } 1992 1993 // Try writing a timestamp in the future to commit timestamp, expect error. 1994 _, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce()) 1995 if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok { 1996 t.Error(msg) 1997 } 1998} 1999 2000func TestIntegration_DML(t *testing.T) { 2001 t.Parallel() 2002 2003 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2004 defer cancel() 2005 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2006 defer cleanup() 2007 2008 // Function that reads a single row's first name from within a transaction. 2009 readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) { 2010 row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"}) 2011 if err != nil { 2012 return "", err 2013 } 2014 var fn string 2015 if err := row.Column(0, &fn); err != nil { 2016 return "", err 2017 } 2018 return fn, nil 2019 } 2020 2021 // Function that reads multiple rows' first names from outside a read/write 2022 // transaction. 2023 readFirstNames := func(keys ...int) []string { 2024 var ks []KeySet 2025 for _, k := range keys { 2026 ks = append(ks, Key{k}) 2027 } 2028 iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"}) 2029 var got []string 2030 var fn string 2031 err := iter.Do(func(row *Row) error { 2032 if err := row.Column(0, &fn); err != nil { 2033 return err 2034 } 2035 got = append(got, fn) 2036 return nil 2037 }) 2038 if err != nil { 2039 t.Fatalf("readFirstNames(%v): %v", keys, err) 2040 } 2041 return got 2042 } 2043 2044 // Use ReadWriteTransaction.Query to execute a DML statement. 2045 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2046 iter := tx.Query(ctx, Statement{ 2047 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`, 2048 }) 2049 defer iter.Stop() 2050 if row, err := iter.Next(); err != iterator.Done { 2051 t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err) 2052 } 2053 if iter.RowCount != 1 { 2054 t.Errorf("row count: got %d, want 1", iter.RowCount) 2055 } 2056 // The results of the DML statement should be visible to the transaction. 2057 got, err := readFirstName(tx, 1) 2058 if err != nil { 2059 return err 2060 } 2061 if want := "Umm"; got != want { 2062 t.Errorf("got %q, want %q", got, want) 2063 } 2064 return nil 2065 }) 2066 if err != nil { 2067 t.Fatal(err) 2068 } 2069 2070 // Use ReadWriteTransaction.Update to execute a DML statement. 2071 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2072 count, err := tx.Update(ctx, Statement{ 2073 SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`, 2074 }) 2075 if err != nil { 2076 t.Fatal(err) 2077 } 2078 if count != 1 { 2079 t.Errorf("row count: got %d, want 1", count) 2080 } 2081 got, err := readFirstName(tx, 2) 2082 if err != nil { 2083 return err 2084 } 2085 if want := "Eduard"; got != want { 2086 t.Errorf("got %q, want %q", got, want) 2087 } 2088 return nil 2089 2090 }) 2091 if err != nil { 2092 t.Fatal(err) 2093 } 2094 2095 // Roll back a DML statement and confirm that it didn't happen. 2096 var fail = errors.New("fail") 2097 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2098 _, err := tx.Update(ctx, Statement{ 2099 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 2100 }) 2101 if err != nil { 2102 return err 2103 } 2104 return fail 2105 }) 2106 if err != fail { 2107 t.Fatalf("rolling back: got error %v, want the error 'fail'", err) 2108 } 2109 _, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"}) 2110 if got, want := ErrCode(err), codes.NotFound; got != want { 2111 t.Errorf("got %s, want %s", got, want) 2112 } 2113 2114 // Run two DML statements in the same transaction. 2115 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2116 _, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`}) 2117 if err != nil { 2118 return err 2119 } 2120 _, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`}) 2121 if err != nil { 2122 return err 2123 } 2124 return nil 2125 }) 2126 if err != nil { 2127 t.Fatal(err) 2128 } 2129 got := readFirstNames(1, 2) 2130 want := []string{"Oum", "Eddie"} 2131 if !testEqual(got, want) { 2132 t.Errorf("got %v, want %v", got, want) 2133 } 2134 2135 // Run a DML statement and an ordinary mutation in the same transaction. 2136 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2137 _, err := tx.Update(ctx, Statement{ 2138 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 2139 }) 2140 if err != nil { 2141 return err 2142 } 2143 return tx.BufferWrite([]*Mutation{ 2144 Insert("Singers", []string{"SingerId", "FirstName", "LastName"}, 2145 []interface{}{4, "Andy", "Irvine"}), 2146 }) 2147 }) 2148 if err != nil { 2149 t.Fatal(err) 2150 } 2151 got = readFirstNames(3, 4) 2152 want = []string{"Audra", "Andy"} 2153 if !testEqual(got, want) { 2154 t.Errorf("got %v, want %v", got, want) 2155 } 2156 2157 // Attempt to run a query using update. 2158 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2159 _, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`}) 2160 return err 2161 }) 2162 if got, want := ErrCode(err), codes.InvalidArgument; got != want { 2163 t.Errorf("got %s, want %s", got, want) 2164 } 2165} 2166 2167func TestIntegration_StructParametersBind(t *testing.T) { 2168 t.Parallel() 2169 2170 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2171 defer cancel() 2172 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 2173 defer cleanup() 2174 2175 type tRow []interface{} 2176 type tRows []struct{ trow tRow } 2177 2178 type allFields struct { 2179 Stringf string 2180 Intf int 2181 Boolf bool 2182 Floatf float64 2183 Bytef []byte 2184 Timef time.Time 2185 Datef civil.Date 2186 } 2187 allColumns := []string{ 2188 "Stringf", 2189 "Intf", 2190 "Boolf", 2191 "Floatf", 2192 "Bytef", 2193 "Timef", 2194 "Datef", 2195 } 2196 s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1} 2197 s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2} 2198 2199 dynamicStructType := reflect.StructOf([]reflect.StructField{ 2200 {Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`}, 2201 }) 2202 s3 := reflect.New(dynamicStructType) 2203 s3.Elem().Field(0).Set(reflect.ValueOf(t1)) 2204 2205 for i, test := range []struct { 2206 param interface{} 2207 sql string 2208 cols []string 2209 trows tRows 2210 }{ 2211 // Struct value. 2212 { 2213 s1, 2214 "SELECT" + 2215 " @p.Stringf," + 2216 " @p.Intf," + 2217 " @p.Boolf," + 2218 " @p.Floatf," + 2219 " @p.Bytef," + 2220 " @p.Timef," + 2221 " @p.Datef", 2222 allColumns, 2223 tRows{ 2224 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 2225 }, 2226 }, 2227 // Array of struct value. 2228 { 2229 []allFields{s1, s2}, 2230 "SELECT * FROM UNNEST(@p)", 2231 allColumns, 2232 tRows{ 2233 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 2234 {tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}}, 2235 }, 2236 }, 2237 // Null struct. 2238 { 2239 (*allFields)(nil), 2240 "SELECT @p IS NULL", 2241 []string{""}, 2242 tRows{ 2243 {tRow{true}}, 2244 }, 2245 }, 2246 // Null Array of struct. 2247 { 2248 []allFields(nil), 2249 "SELECT @p IS NULL", 2250 []string{""}, 2251 tRows{ 2252 {tRow{true}}, 2253 }, 2254 }, 2255 // Empty struct. 2256 { 2257 struct{}{}, 2258 "SELECT @p IS NULL ", 2259 []string{""}, 2260 tRows{ 2261 {tRow{false}}, 2262 }, 2263 }, 2264 // Empty array of struct. 2265 { 2266 []allFields{}, 2267 "SELECT * FROM UNNEST(@p) ", 2268 allColumns, 2269 tRows{}, 2270 }, 2271 // Struct with duplicate fields. 2272 { 2273 struct { 2274 A int `spanner:"field"` 2275 B int `spanner:"field"` 2276 }{10, 20}, 2277 "SELECT * FROM UNNEST([@p]) ", 2278 []string{"field", "field"}, 2279 tRows{ 2280 {tRow{10, 20}}, 2281 }, 2282 }, 2283 // Struct with unnamed fields. 2284 { 2285 struct { 2286 A string `spanner:""` 2287 }{"hello"}, 2288 "SELECT * FROM UNNEST([@p]) ", 2289 []string{""}, 2290 tRows{ 2291 {tRow{"hello"}}, 2292 }, 2293 }, 2294 // Mixed struct. 2295 { 2296 struct { 2297 DynamicStructField interface{} `spanner:"f1"` 2298 ArrayStructField []*allFields `spanner:"f2"` 2299 }{ 2300 DynamicStructField: s3.Interface(), 2301 ArrayStructField: []*allFields{nil}, 2302 }, 2303 "SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ", 2304 []string{"ff1", "", ""}, 2305 tRows{ 2306 {tRow{t1, 1, true}}, 2307 }, 2308 }, 2309 } { 2310 iter := client.Single().Query(ctx, Statement{ 2311 SQL: test.sql, 2312 Params: map[string]interface{}{"p": test.param}, 2313 }) 2314 var gotRows []*Row 2315 err := iter.Do(func(r *Row) error { 2316 gotRows = append(gotRows, r) 2317 return nil 2318 }) 2319 if err != nil { 2320 t.Errorf("Failed to execute test case %d, error: %v", i, err) 2321 } 2322 2323 var wantRows []*Row 2324 for j, row := range test.trows { 2325 r, err := NewRow(test.cols, row.trow) 2326 if err != nil { 2327 t.Errorf("Invalid row %d in test case %d", j, i) 2328 } 2329 wantRows = append(wantRows, r) 2330 } 2331 if !testEqual(gotRows, wantRows) { 2332 t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows) 2333 } 2334 } 2335} 2336 2337func TestIntegration_PDML(t *testing.T) { 2338 skipEmulatorTest(t) 2339 t.Parallel() 2340 2341 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2342 defer cancel() 2343 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2344 defer cleanup() 2345 2346 columns := []string{"SingerId", "FirstName", "LastName"} 2347 2348 // Populate the Singers table. 2349 var muts []*Mutation 2350 for _, row := range [][]interface{}{ 2351 {1, "Umm", "Kulthum"}, 2352 {2, "Eduard", "Khil"}, 2353 {3, "Audra", "McDonald"}, 2354 {4, "Enrique", "Iglesias"}, 2355 {5, "Shakira", "Ripoll"}, 2356 } { 2357 muts = append(muts, Insert("Singers", columns, row)) 2358 } 2359 if _, err := client.Apply(ctx, muts); err != nil { 2360 t.Fatal(err) 2361 } 2362 // Identifiers in PDML statements must be fully qualified. 2363 // TODO(jba): revisit the above. 2364 count, err := client.PartitionedUpdate(ctx, Statement{ 2365 SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= @end`, 2366 Params: map[string]interface{}{ 2367 "end": 3, 2368 }, 2369 }) 2370 if err != nil { 2371 t.Fatal(err) 2372 } 2373 if want := int64(3); count != want { 2374 t.Fatalf("got %d, want %d", count, want) 2375 } 2376 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2377 if err != nil { 2378 t.Fatal(err) 2379 } 2380 want := [][]interface{}{ 2381 {int64(1), "changed", "Kulthum"}, 2382 {int64(2), "changed", "Khil"}, 2383 {int64(3), "changed", "McDonald"}, 2384 {int64(4), "Enrique", "Iglesias"}, 2385 {int64(5), "Shakira", "Ripoll"}, 2386 } 2387 if !testEqual(got, want) { 2388 t.Errorf("\ngot %v\nwant%v", got, want) 2389 } 2390} 2391 2392func TestBatchDML(t *testing.T) { 2393 t.Parallel() 2394 2395 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2396 defer cancel() 2397 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2398 defer cleanup() 2399 2400 columns := []string{"SingerId", "FirstName", "LastName"} 2401 2402 // Populate the Singers table. 2403 var muts []*Mutation 2404 for _, row := range [][]interface{}{ 2405 {1, "Umm", "Kulthum"}, 2406 {2, "Eduard", "Khil"}, 2407 {3, "Audra", "McDonald"}, 2408 } { 2409 muts = append(muts, Insert("Singers", columns, row)) 2410 } 2411 if _, err := client.Apply(ctx, muts); err != nil { 2412 t.Fatal(err) 2413 } 2414 2415 var counts []int64 2416 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2417 counts, err = tx.BatchUpdate(ctx, []Statement{ 2418 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2419 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2420 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2421 }) 2422 return err 2423 }) 2424 2425 if err != nil { 2426 t.Fatal(err) 2427 } 2428 if want := []int64{1, 1, 1}; !testEqual(counts, want) { 2429 t.Fatalf("got %d, want %d", counts, want) 2430 } 2431 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2432 if err != nil { 2433 t.Fatal(err) 2434 } 2435 want := [][]interface{}{ 2436 {int64(1), "changed 1", "Kulthum"}, 2437 {int64(2), "changed 2", "Khil"}, 2438 {int64(3), "changed 3", "McDonald"}, 2439 } 2440 if !testEqual(got, want) { 2441 t.Errorf("\ngot %v\nwant%v", got, want) 2442 } 2443} 2444 2445func TestBatchDML_NoStatements(t *testing.T) { 2446 t.Parallel() 2447 2448 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2449 defer cancel() 2450 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2451 defer cleanup() 2452 2453 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2454 _, err = tx.BatchUpdate(ctx, []Statement{}) 2455 return err 2456 }) 2457 if err == nil { 2458 t.Fatal("expected error, got nil") 2459 } 2460 if s, ok := status.FromError(err); ok { 2461 if s.Code() != codes.InvalidArgument { 2462 t.Fatalf("expected InvalidArgument, got %v", err) 2463 } 2464 } else { 2465 t.Fatalf("expected InvalidArgument, got %v", err) 2466 } 2467} 2468 2469func TestBatchDML_TwoStatements(t *testing.T) { 2470 t.Parallel() 2471 2472 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2473 defer cancel() 2474 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2475 defer cleanup() 2476 2477 columns := []string{"SingerId", "FirstName", "LastName"} 2478 2479 // Populate the Singers table. 2480 var muts []*Mutation 2481 for _, row := range [][]interface{}{ 2482 {1, "Umm", "Kulthum"}, 2483 {2, "Eduard", "Khil"}, 2484 {3, "Audra", "McDonald"}, 2485 } { 2486 muts = append(muts, Insert("Singers", columns, row)) 2487 } 2488 if _, err := client.Apply(ctx, muts); err != nil { 2489 t.Fatal(err) 2490 } 2491 2492 var updateCount int64 2493 var batchCounts []int64 2494 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2495 batchCounts, err = tx.BatchUpdate(ctx, []Statement{ 2496 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2497 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2498 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2499 }) 2500 if err != nil { 2501 return err 2502 } 2503 2504 updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}) 2505 return err 2506 }) 2507 if err != nil { 2508 t.Fatal(err) 2509 } 2510 if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) { 2511 t.Fatalf("got %d, want %d", batchCounts, want) 2512 } 2513 if updateCount != 1 { 2514 t.Fatalf("got %v, want 1", updateCount) 2515 } 2516} 2517 2518// TODO(deklerk): this currently does not work because the transaction appears to 2519// get rolled back after a single statement fails. b/120158761 2520func TestBatchDML_Error(t *testing.T) { 2521 t.Parallel() 2522 2523 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2524 defer cancel() 2525 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2526 defer cleanup() 2527 2528 columns := []string{"SingerId", "FirstName", "LastName"} 2529 2530 // Populate the Singers table. 2531 var muts []*Mutation 2532 for _, row := range [][]interface{}{ 2533 {1, "Umm", "Kulthum"}, 2534 {2, "Eduard", "Khil"}, 2535 {3, "Audra", "McDonald"}, 2536 } { 2537 muts = append(muts, Insert("Singers", columns, row)) 2538 } 2539 if _, err := client.Apply(ctx, muts); err != nil { 2540 t.Fatal(err) 2541 } 2542 2543 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2544 counts, err := tx.BatchUpdate(ctx, []Statement{ 2545 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2546 {SQL: `some illegal statement`}, 2547 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2548 }) 2549 if err == nil { 2550 t.Fatal("expected err, got nil") 2551 } 2552 if want := []int64{1}; !testEqual(counts, want) { 2553 t.Fatalf("got %d, want %d", counts, want) 2554 } 2555 2556 got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns)) 2557 if err != nil { 2558 t.Fatal(err) 2559 } 2560 want := [][]interface{}{ 2561 {int64(1), "changed 1", "Kulthum"}, 2562 {int64(2), "Eduard", "Khil"}, 2563 {int64(3), "Audra", "McDonald"}, 2564 } 2565 if !testEqual(got, want) { 2566 t.Errorf("\ngot %v\nwant%v", got, want) 2567 } 2568 2569 return nil 2570 }) 2571 if err != nil { 2572 t.Fatal(err) 2573 } 2574} 2575 2576// Prepare initializes Cloud Spanner testing DB and clients. 2577func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) { 2578 if databaseAdmin == nil { 2579 t.Skip("Integration tests skipped") 2580 } 2581 // Construct a unique test DB name. 2582 dbName := dbNameSpace.New() 2583 2584 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName) 2585 // Create database and tables. 2586 op, err := databaseAdmin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{ 2587 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 2588 CreateStatement: "CREATE DATABASE " + dbName, 2589 ExtraStatements: statements, 2590 }) 2591 if err != nil { 2592 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 2593 } 2594 if _, err := op.Wait(ctx); err != nil { 2595 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 2596 } 2597 client, err := createClient(ctx, dbPath, spc) 2598 if err != nil { 2599 t.Fatalf("cannot create data client on DB %v: %v", dbPath, err) 2600 } 2601 return client, dbPath, func() { 2602 client.Close() 2603 } 2604} 2605 2606func cleanupInstances() { 2607 if instanceAdmin == nil { 2608 // Integration tests skipped. 2609 return 2610 } 2611 2612 ctx := context.Background() 2613 parent := fmt.Sprintf("projects/%v", testProjectID) 2614 iter := instanceAdmin.ListInstances(ctx, &instancepb.ListInstancesRequest{ 2615 Parent: parent, 2616 Filter: "name:gotest-", 2617 }) 2618 expireAge := 24 * time.Hour 2619 2620 for { 2621 inst, err := iter.Next() 2622 if err == iterator.Done { 2623 break 2624 } 2625 if err != nil { 2626 panic(err) 2627 } 2628 if instanceNameSpace.Older(inst.Name, expireAge) { 2629 log.Printf("Deleting instance %s", inst.Name) 2630 2631 if err := instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{Name: inst.Name}); err != nil { 2632 log.Printf("failed to delete instance %s (error %v), might need a manual removal", 2633 inst.Name, err) 2634 } 2635 } 2636 } 2637} 2638 2639func rangeReads(ctx context.Context, t *testing.T, client *Client) { 2640 checkRange := func(ks KeySet, wantNums ...int) { 2641 if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok { 2642 t.Errorf("key set %+v: %s", ks, msg) 2643 } 2644 } 2645 2646 checkRange(Key{"k1"}, 1) 2647 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4) 2648 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5) 2649 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5) 2650 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4) 2651 2652 // Partial key specification. 2653 checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9) 2654 checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9) 2655 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10) 2656 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11) 2657 2658 // The following produce empty ranges. 2659 // TODO(jba): Consider a multi-part key to illustrate partial key behavior. 2660 // checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen}) 2661 // checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen}) 2662 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen}) 2663 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed}) 2664 2665 // Prefix is component-wise, not string prefix. 2666 checkRange(Key{"k1"}.AsPrefix(), 1) 2667 checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 2668 2669 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 2670} 2671 2672func indexRangeReads(ctx context.Context, t *testing.T, client *Client) { 2673 checkRange := func(ks KeySet, wantNums ...int) { 2674 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns), 2675 wantNums); !ok { 2676 t.Errorf("key set %+v: %s", ks, msg) 2677 } 2678 } 2679 2680 checkRange(Key{"v1"}, 1) 2681 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4) 2682 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5) 2683 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5) 2684 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4) 2685 2686 // // Partial key specification. 2687 checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9) 2688 checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9) 2689 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10) 2690 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11) 2691 2692 // // The following produce empty ranges. 2693 // checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen}) 2694 // checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen}) 2695 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen}) 2696 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed}) 2697 2698 // // Prefix is component-wise, not string prefix. 2699 checkRange(Key{"v1"}.AsPrefix(), 1) 2700 checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 2701 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 2702 2703 // Read from an index with DESC ordering. 2704 wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0} 2705 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns), 2706 wantNums); !ok { 2707 t.Errorf("desc: %s", msg) 2708 } 2709} 2710 2711type testTableRow struct{ Key, StringValue string } 2712 2713func compareRows(iter *RowIterator, wantNums []int) (string, bool) { 2714 rows, err := readAllTestTable(iter) 2715 if err != nil { 2716 return err.Error(), false 2717 } 2718 want := map[string]string{} 2719 for _, n := range wantNums { 2720 want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n) 2721 } 2722 got := map[string]string{} 2723 for _, r := range rows { 2724 got[r.Key] = r.StringValue 2725 } 2726 if !testEqual(got, want) { 2727 return fmt.Sprintf("got %v, want %v", got, want), false 2728 } 2729 return "", true 2730} 2731 2732func isNaN(x interface{}) bool { 2733 f, ok := x.(float64) 2734 if !ok { 2735 return false 2736 } 2737 return math.IsNaN(f) 2738} 2739 2740// createClient creates Cloud Spanner data client. 2741func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) { 2742 if os.Getenv("SPANNER_EMULATOR_HOST") == "" { 2743 client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{ 2744 SessionPoolConfig: spc, 2745 }, option.WithTokenSource(testutil.TokenSource(ctx, Scope, AdminScope)), option.WithEndpoint(endpoint)) 2746 } else { 2747 // When the emulator is enabled, option.WithoutAuthentication() 2748 // will be added automatically which is incompatible with 2749 // option.WithTokenSource(testutil.TokenSource(ctx, Scope)). 2750 client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}) 2751 } 2752 2753 if err != nil { 2754 return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err) 2755 } 2756 return client, nil 2757} 2758 2759// populate prepares the database with some data. 2760func populate(ctx context.Context, client *Client) error { 2761 // Populate data 2762 var err error 2763 m := InsertMap("test", map[string]interface{}{ 2764 "a": str1, 2765 "b": str2, 2766 }) 2767 _, err = client.Apply(ctx, []*Mutation{m}) 2768 return err 2769} 2770 2771func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) { 2772 if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) { 2773 return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false 2774 } 2775 return "", true 2776} 2777 2778func rowToValues(r *Row) ([]interface{}, error) { 2779 var x int64 2780 var y, z string 2781 if err := r.Column(0, &x); err != nil { 2782 return nil, err 2783 } 2784 if err := r.Column(1, &y); err != nil { 2785 return nil, err 2786 } 2787 if err := r.Column(2, &z); err != nil { 2788 return nil, err 2789 } 2790 return []interface{}{x, y, z}, nil 2791} 2792 2793func readAll(iter *RowIterator) ([][]interface{}, error) { 2794 defer iter.Stop() 2795 var vals [][]interface{} 2796 for { 2797 row, err := iter.Next() 2798 if err == iterator.Done { 2799 return vals, nil 2800 } 2801 if err != nil { 2802 return nil, err 2803 } 2804 v, err := rowToValues(row) 2805 if err != nil { 2806 return nil, err 2807 } 2808 vals = append(vals, v) 2809 } 2810} 2811 2812func readAllTestTable(iter *RowIterator) ([]testTableRow, error) { 2813 defer iter.Stop() 2814 var vals []testTableRow 2815 for { 2816 row, err := iter.Next() 2817 if err == iterator.Done { 2818 return vals, nil 2819 } 2820 if err != nil { 2821 return nil, err 2822 } 2823 var ttr testTableRow 2824 if err := row.ToStruct(&ttr); err != nil { 2825 return nil, err 2826 } 2827 vals = append(vals, ttr) 2828 } 2829} 2830 2831func maxDuration(a, b time.Duration) time.Duration { 2832 if a > b { 2833 return a 2834 } 2835 return b 2836} 2837 2838func skipEmulatorTest(t *testing.T) { 2839 if os.Getenv("SPANNER_EMULATOR_HOST") != "" { 2840 t.Skip("Skipping testing against the emulator.") 2841 } 2842} 2843