1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "errors" 22 "flag" 23 "fmt" 24 "log" 25 "math" 26 "math/big" 27 "os" 28 "reflect" 29 "regexp" 30 "strings" 31 "sync" 32 "testing" 33 "time" 34 35 "cloud.google.com/go/civil" 36 "cloud.google.com/go/internal/testutil" 37 "cloud.google.com/go/internal/uid" 38 database "cloud.google.com/go/spanner/admin/database/apiv1" 39 instance "cloud.google.com/go/spanner/admin/instance/apiv1" 40 "google.golang.org/api/iterator" 41 "google.golang.org/api/option" 42 adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" 43 instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1" 44 sppb "google.golang.org/genproto/googleapis/spanner/v1" 45 "google.golang.org/grpc" 46 "google.golang.org/grpc/codes" 47 "google.golang.org/grpc/status" 48) 49 50var ( 51 // testProjectID specifies the project used for testing. It can be changed 52 // by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID. 53 testProjectID = testutil.ProjID() 54 55 dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) 56 instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true}) 57 backupIDSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) 58 testInstanceID = instanceNameSpace.New() 59 60 testTable = "TestTable" 61 testTableIndex = "TestTableByValue" 62 testTableColumns = []string{"Key", "StringValue"} 63 64 databaseAdmin *database.DatabaseAdminClient 65 instanceAdmin *instance.InstanceAdminClient 66 67 singerDBStatements = []string{ 68 `CREATE TABLE Singers ( 69 SingerId INT64 NOT NULL, 70 FirstName STRING(1024), 71 LastName STRING(1024), 72 SingerInfo BYTES(MAX) 73 ) PRIMARY KEY (SingerId)`, 74 `CREATE INDEX SingerByName ON Singers(FirstName, LastName)`, 75 `CREATE TABLE Accounts ( 76 AccountId INT64 NOT NULL, 77 Nickname STRING(100), 78 Balance INT64 NOT NULL, 79 ) PRIMARY KEY (AccountId)`, 80 `CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`, 81 `CREATE TABLE Types ( 82 RowID INT64 NOT NULL, 83 String STRING(MAX), 84 StringArray ARRAY<STRING(MAX)>, 85 Bytes BYTES(MAX), 86 BytesArray ARRAY<BYTES(MAX)>, 87 Int64a INT64, 88 Int64Array ARRAY<INT64>, 89 Bool BOOL, 90 BoolArray ARRAY<BOOL>, 91 Float64 FLOAT64, 92 Float64Array ARRAY<FLOAT64>, 93 Date DATE, 94 DateArray ARRAY<DATE>, 95 Timestamp TIMESTAMP, 96 TimestampArray ARRAY<TIMESTAMP>, 97 ) PRIMARY KEY (RowID)`, 98 } 99 100 readDBStatements = []string{ 101 `CREATE TABLE TestTable ( 102 Key STRING(MAX) NOT NULL, 103 StringValue STRING(MAX) 104 ) PRIMARY KEY (Key)`, 105 `CREATE INDEX TestTableByValue ON TestTable(StringValue)`, 106 `CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`, 107 } 108 109 simpleDBStatements = []string{ 110 `CREATE TABLE test ( 111 a STRING(1024), 112 b STRING(1024), 113 ) PRIMARY KEY (a)`, 114 } 115 simpleDBTableColumns = []string{"a", "b"} 116 117 ctsDBStatements = []string{ 118 `CREATE TABLE TestTable ( 119 Key STRING(MAX) NOT NULL, 120 Ts TIMESTAMP OPTIONS (allow_commit_timestamp = true), 121 ) PRIMARY KEY (Key)`, 122 } 123 backuDBStatements = []string{ 124 `CREATE TABLE Singers ( 125 SingerId INT64 NOT NULL, 126 FirstName STRING(1024), 127 LastName STRING(1024), 128 SingerInfo BYTES(MAX) 129 ) PRIMARY KEY (SingerId)`, 130 `CREATE INDEX SingerByName ON Singers(FirstName, LastName)`, 131 `CREATE TABLE Accounts ( 132 AccountId INT64 NOT NULL, 133 Nickname STRING(100), 134 Balance INT64 NOT NULL, 135 ) PRIMARY KEY (AccountId)`, 136 `CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`, 137 `CREATE TABLE Types ( 138 RowID INT64 NOT NULL, 139 String STRING(MAX), 140 StringArray ARRAY<STRING(MAX)>, 141 Bytes BYTES(MAX), 142 BytesArray ARRAY<BYTES(MAX)>, 143 Int64a INT64, 144 Int64Array ARRAY<INT64>, 145 Bool BOOL, 146 BoolArray ARRAY<BOOL>, 147 Float64 FLOAT64, 148 Float64Array ARRAY<FLOAT64>, 149 Date DATE, 150 DateArray ARRAY<DATE>, 151 Timestamp TIMESTAMP, 152 TimestampArray ARRAY<TIMESTAMP>, 153 ) PRIMARY KEY (RowID)`, 154 } 155 156 validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)$") 157) 158 159func parseInstanceName(inst string) (project, instance string, err error) { 160 matches := validInstancePattern.FindStringSubmatch(inst) 161 if len(matches) == 0 { 162 return "", "", fmt.Errorf("Failed to parse instance name from %q according to pattern %q", 163 inst, validInstancePattern.String()) 164 } 165 return matches[1], matches[2], nil 166} 167 168const ( 169 str1 = "alice" 170 str2 = "a@example.com" 171) 172 173func TestMain(m *testing.M) { 174 cleanup := initIntegrationTests() 175 res := m.Run() 176 cleanup() 177 os.Exit(res) 178} 179 180var grpcHeaderChecker = testutil.DefaultHeadersEnforcer() 181 182func initIntegrationTests() (cleanup func()) { 183 ctx := context.Background() 184 flag.Parse() // Needed for testing.Short(). 185 noop := func() {} 186 187 if testing.Short() { 188 log.Println("Integration tests skipped in -short mode.") 189 return noop 190 } 191 192 if testProjectID == "" { 193 log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing") 194 return noop 195 } 196 197 opts := grpcHeaderChecker.CallOptions() 198 var err error 199 // Create InstanceAdmin and DatabaseAdmin clients. 200 instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...) 201 if err != nil { 202 log.Fatalf("cannot create instance databaseAdmin client: %v", err) 203 } 204 databaseAdmin, err = database.NewDatabaseAdminClient(ctx, opts...) 205 if err != nil { 206 log.Fatalf("cannot create databaseAdmin client: %v", err) 207 } 208 // Get the list of supported instance configs for the project that is used 209 // for the integration tests. The supported instance configs can differ per 210 // project. The integration tests will use the first instance config that 211 // is returned by Cloud Spanner. This will normally be the regional config 212 // that is physically the closest to where the request is coming from. 213 configIterator := instanceAdmin.ListInstanceConfigs(ctx, &instancepb.ListInstanceConfigsRequest{ 214 Parent: fmt.Sprintf("projects/%s", testProjectID), 215 }) 216 config, err := configIterator.Next() 217 if err != nil { 218 log.Fatalf("Cannot get any instance configurations.\nPlease make sure the Cloud Spanner API is enabled for the test project.\nGet error: %v", err) 219 } 220 221 // First clean up any old test instances before we start the actual testing 222 // as these might cause this test run to fail. 223 cleanupInstances() 224 225 // Create a test instance to use for this test run. 226 op, err := instanceAdmin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{ 227 Parent: fmt.Sprintf("projects/%s", testProjectID), 228 InstanceId: testInstanceID, 229 Instance: &instancepb.Instance{ 230 Config: config.Name, 231 DisplayName: testInstanceID, 232 NodeCount: 1, 233 }, 234 }) 235 if err != nil { 236 log.Fatalf("could not create instance with id %s: %v", fmt.Sprintf("projects/%s/instances/%s", testProjectID, testInstanceID), err) 237 } 238 // Wait for the instance creation to finish. 239 i, err := op.Wait(ctx) 240 if err != nil { 241 log.Fatalf("waiting for instance creation to finish failed: %v", err) 242 } 243 if i.State != instancepb.Instance_READY { 244 log.Printf("instance state is not READY, it might be that the test instance will cause problems during tests. Got state %v\n", i.State) 245 } 246 247 return func() { 248 // Delete this test instance. 249 instanceName := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID) 250 deleteInstanceAndBackups(ctx, instanceName) 251 // Delete other test instances that may be lingering around. 252 cleanupInstances() 253 databaseAdmin.Close() 254 instanceAdmin.Close() 255 } 256} 257 258func TestIntegration_InitSessionPool(t *testing.T) { 259 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 260 defer cancel() 261 // Set up an empty testing environment. 262 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{}) 263 defer cleanup() 264 sp := client.idleSessions 265 sp.mu.Lock() 266 want := sp.MinOpened 267 sp.mu.Unlock() 268 var numOpened int 269loop: 270 for { 271 select { 272 case <-ctx.Done(): 273 t.Fatalf("timed out, got %d session(s), want %d", numOpened, want) 274 default: 275 sp.mu.Lock() 276 numOpened = sp.idleList.Len() + sp.idleWriteList.Len() 277 sp.mu.Unlock() 278 if uint64(numOpened) == want { 279 break loop 280 } 281 } 282 } 283 // Delete all sessions in the pool on the backend and then try to execute a 284 // simple query. The 'Session not found' error should cause an automatic 285 // retry of the read-only transaction. 286 sp.mu.Lock() 287 s := sp.idleList.Front() 288 for { 289 if s == nil { 290 break 291 } 292 // This will delete the session on the backend without removing it 293 // from the pool. 294 s.Value.(*session).delete(context.Background()) 295 s = s.Next() 296 } 297 sp.mu.Unlock() 298 sql := "SELECT 1, 'FOO', 'BAR'" 299 tx := client.ReadOnlyTransaction() 300 defer tx.Close() 301 iter := tx.Query(context.Background(), NewStatement(sql)) 302 rows, err := readAll(iter) 303 if err != nil { 304 t.Fatalf("Unexpected error for query %q: %v", sql, err) 305 } 306 if got, want := len(rows), 1; got != want { 307 t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want) 308 } 309 if got, want := len(rows[0]), 3; got != want { 310 t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want) 311 } 312 if got, want := rows[0][0].(int64), int64(1); got != want { 313 t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want) 314 } 315} 316 317// Test SingleUse transaction. 318func TestIntegration_SingleUse(t *testing.T) { 319 t.Parallel() 320 321 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 322 defer cancel() 323 // Set up testing environment. 324 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 325 defer cleanup() 326 327 writes := []struct { 328 row []interface{} 329 ts time.Time 330 }{ 331 {row: []interface{}{1, "Marc", "Foo"}}, 332 {row: []interface{}{2, "Tars", "Bar"}}, 333 {row: []interface{}{3, "Alpha", "Beta"}}, 334 {row: []interface{}{4, "Last", "End"}}, 335 } 336 // Try to write four rows through the Apply API. 337 for i, w := range writes { 338 var err error 339 m := InsertOrUpdate("Singers", 340 []string{"SingerId", "FirstName", "LastName"}, 341 w.row) 342 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 343 t.Fatal(err) 344 } 345 } 346 // Calculate time difference between Cloud Spanner server and localhost to 347 // use to determine the exact staleness value to use. 348 timeDiff := maxDuration(time.Now().Sub(writes[0].ts), 0) 349 350 // Test reading rows with different timestamp bounds. 351 for i, test := range []struct { 352 name string 353 want [][]interface{} 354 tb TimestampBound 355 checkTs func(time.Time) error 356 }{ 357 { 358 name: "strong", 359 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 360 tb: StrongRead(), 361 checkTs: func(ts time.Time) error { 362 // writes[3] is the last write, all subsequent strong read 363 // should have a timestamp larger than that. 364 if ts.Before(writes[3].ts) { 365 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 366 } 367 return nil 368 }, 369 }, 370 { 371 name: "min_read_timestamp", 372 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 373 tb: MinReadTimestamp(writes[3].ts), 374 checkTs: func(ts time.Time) error { 375 if ts.Before(writes[3].ts) { 376 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 377 } 378 return nil 379 }, 380 }, 381 { 382 name: "max_staleness", 383 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 384 tb: MaxStaleness(time.Second), 385 checkTs: func(ts time.Time) error { 386 if ts.Before(writes[3].ts) { 387 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 388 } 389 return nil 390 }, 391 }, 392 { 393 name: "read_timestamp", 394 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 395 tb: ReadTimestamp(writes[2].ts), 396 checkTs: func(ts time.Time) error { 397 if ts != writes[2].ts { 398 return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts) 399 } 400 return nil 401 }, 402 }, 403 { 404 name: "exact_staleness", 405 want: nil, 406 // Specify a staleness which should be already before this test. 407 tb: ExactStaleness(time.Now().Sub(writes[0].ts) + timeDiff + 30*time.Second), 408 checkTs: func(ts time.Time) error { 409 if !ts.Before(writes[0].ts) { 410 return fmt.Errorf("read got timestamp %v, want it to be earlier than %v", ts, writes[0].ts) 411 } 412 return nil 413 }, 414 }, 415 } { 416 t.Run(test.name, func(t *testing.T) { 417 // SingleUse.Query 418 su := client.Single().WithTimestampBound(test.tb) 419 got, err := readAll(su.Query( 420 ctx, 421 Statement{ 422 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId", 423 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 424 })) 425 if err != nil { 426 t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err) 427 } 428 rts, err := su.Timestamp() 429 if err != nil { 430 t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err) 431 } 432 if err := test.checkTs(rts); err != nil { 433 t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err) 434 } 435 if !testEqual(got, test.want) { 436 t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want) 437 } 438 // SingleUse.Read 439 su = client.Single().WithTimestampBound(test.tb) 440 got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 441 if err != nil { 442 t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err) 443 } 444 rts, err = su.Timestamp() 445 if err != nil { 446 t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err) 447 } 448 if err := test.checkTs(rts); err != nil { 449 t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err) 450 } 451 if !testEqual(got, test.want) { 452 t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want) 453 } 454 // SingleUse.ReadRow 455 got = nil 456 for _, k := range []Key{{1}, {3}, {4}} { 457 su = client.Single().WithTimestampBound(test.tb) 458 r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 459 if err != nil { 460 continue 461 } 462 v, err := rowToValues(r) 463 if err != nil { 464 continue 465 } 466 got = append(got, v) 467 rts, err = su.Timestamp() 468 if err != nil { 469 t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 470 } 471 if err := test.checkTs(rts); err != nil { 472 t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 473 } 474 } 475 if !testEqual(got, test.want) { 476 t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want) 477 } 478 // SingleUse.ReadUsingIndex 479 su = client.Single().WithTimestampBound(test.tb) 480 got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 481 if err != nil { 482 t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err) 483 } 484 // The results from ReadUsingIndex is sorted by the index rather than primary key. 485 if len(got) != len(test.want) { 486 t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 487 } 488 for j, g := range got { 489 if j > 0 { 490 prev := got[j-1][1].(string) + got[j-1][2].(string) 491 curr := got[j][1].(string) + got[j][2].(string) 492 if strings.Compare(prev, curr) > 0 { 493 t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 494 } 495 } 496 found := false 497 for _, w := range test.want { 498 if testEqual(g, w) { 499 found = true 500 } 501 } 502 if !found { 503 t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 504 break 505 } 506 } 507 rts, err = su.Timestamp() 508 if err != nil { 509 t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 510 } 511 if err := test.checkTs(rts); err != nil { 512 t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 513 } 514 // SingleUse.ReadRowUsingIndex 515 got = nil 516 for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} { 517 su = client.Single().WithTimestampBound(test.tb) 518 r, err := su.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"}) 519 if err != nil { 520 continue 521 } 522 v, err := rowToValues(r) 523 if err != nil { 524 continue 525 } 526 got = append(got, v) 527 rts, err = su.Timestamp() 528 if err != nil { 529 t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err) 530 } 531 if err := test.checkTs(rts); err != nil { 532 t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err) 533 } 534 } 535 if !testEqual(got, test.want) { 536 t.Fatalf("%d: got unexpected results from SingleUse.ReadRowUsingIndex: %v, want %v", i, got, test.want) 537 } 538 }) 539 } 540} 541 542// Test custom query options provided on query-level configuration. 543func TestIntegration_SingleUse_WithQueryOptions(t *testing.T) { 544 skipEmulatorTest(t) 545 t.Parallel() 546 547 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 548 defer cancel() 549 // Set up testing environment. 550 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 551 defer cleanup() 552 553 writes := []struct { 554 row []interface{} 555 ts time.Time 556 }{ 557 {row: []interface{}{1, "Marc", "Foo"}}, 558 {row: []interface{}{2, "Tars", "Bar"}}, 559 {row: []interface{}{3, "Alpha", "Beta"}}, 560 {row: []interface{}{4, "Last", "End"}}, 561 } 562 // Try to write four rows through the Apply API. 563 for i, w := range writes { 564 var err error 565 m := InsertOrUpdate("Singers", 566 []string{"SingerId", "FirstName", "LastName"}, 567 w.row) 568 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 569 t.Fatal(err) 570 } 571 } 572 qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}} 573 got, err := readAll(client.Single().QueryWithOptions(ctx, Statement{ 574 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)", 575 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 576 }, qo)) 577 578 if err != nil { 579 t.Errorf("ReadOnlyTransaction.QueryWithOptions returns error %v, want nil", err) 580 } 581 582 want := [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}} 583 if !testEqual(got, want) { 584 t.Errorf("got unexpected result from ReadOnlyTransaction.QueryWithOptions: %v, want %v", got, want) 585 } 586} 587 588func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) { 589 t.Parallel() 590 591 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 592 defer cancel() 593 // Set up testing environment. 594 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 595 defer cleanup() 596 597 writes := []struct { 598 row []interface{} 599 ts time.Time 600 }{ 601 {row: []interface{}{1, "Marc", "Foo"}}, 602 {row: []interface{}{2, "Tars", "Bar"}}, 603 {row: []interface{}{3, "Alpha", "Beta"}}, 604 {row: []interface{}{4, "Last", "End"}}, 605 } 606 // Try to write four rows through the Apply API. 607 for i, w := range writes { 608 var err error 609 m := InsertOrUpdate("Singers", 610 []string{"SingerId", "FirstName", "LastName"}, 611 w.row) 612 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 613 t.Fatal(err) 614 } 615 } 616 617 su := client.Single() 618 const limit = 1 619 gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), 620 []string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit})) 621 if err != nil { 622 t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err) 623 } 624 if got, want := len(gotRows), limit; got != want { 625 t.Errorf("got %d, want %d", got, want) 626 } 627} 628 629// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it 630// also tests for a single timestamp across multiple reads. 631func TestIntegration_ReadOnlyTransaction(t *testing.T) { 632 t.Parallel() 633 634 ctxTimeout := 5 * time.Minute 635 ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) 636 defer cancel() 637 // Set up testing environment. 638 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 639 defer cleanup() 640 641 writes := []struct { 642 row []interface{} 643 ts time.Time 644 }{ 645 {row: []interface{}{1, "Marc", "Foo"}}, 646 {row: []interface{}{2, "Tars", "Bar"}}, 647 {row: []interface{}{3, "Alpha", "Beta"}}, 648 {row: []interface{}{4, "Last", "End"}}, 649 } 650 // Try to write four rows through the Apply API. 651 for i, w := range writes { 652 var err error 653 m := InsertOrUpdate("Singers", 654 []string{"SingerId", "FirstName", "LastName"}, 655 w.row) 656 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 657 t.Fatal(err) 658 } 659 } 660 661 // For testing timestamp bound staleness. 662 <-time.After(time.Second) 663 664 // Test reading rows with different timestamp bounds. 665 for i, test := range []struct { 666 want [][]interface{} 667 tb TimestampBound 668 checkTs func(time.Time) error 669 }{ 670 // Note: min_read_timestamp and max_staleness are not supported by 671 // ReadOnlyTransaction. See API document for more details. 672 { 673 // strong 674 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 675 StrongRead(), 676 func(ts time.Time) error { 677 if ts.Before(writes[3].ts) { 678 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 679 } 680 return nil 681 }, 682 }, 683 { 684 // read_timestamp 685 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 686 ReadTimestamp(writes[2].ts), 687 func(ts time.Time) error { 688 if ts != writes[2].ts { 689 return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts) 690 } 691 return nil 692 }, 693 }, 694 { 695 // exact_staleness 696 nil, 697 // Specify a staleness which should be already before this test. 698 ExactStaleness(ctxTimeout + 1), 699 func(ts time.Time) error { 700 if ts.After(writes[0].ts) { 701 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts) 702 } 703 return nil 704 }, 705 }, 706 } { 707 // ReadOnlyTransaction.Query 708 ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb) 709 got, err := readAll(ro.Query( 710 ctx, 711 Statement{ 712 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId", 713 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 714 })) 715 if err != nil { 716 t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err) 717 } 718 if !testEqual(got, test.want) { 719 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want) 720 } 721 rts, err := ro.Timestamp() 722 if err != nil { 723 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err) 724 } 725 if err := test.checkTs(rts); err != nil { 726 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err) 727 } 728 roTs := rts 729 // ReadOnlyTransaction.Read 730 got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 731 if err != nil { 732 t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err) 733 } 734 if !testEqual(got, test.want) { 735 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want) 736 } 737 rts, err = ro.Timestamp() 738 if err != nil { 739 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err) 740 } 741 if err := test.checkTs(rts); err != nil { 742 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err) 743 } 744 if roTs != rts { 745 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 746 } 747 // ReadOnlyTransaction.ReadRow 748 got = nil 749 for _, k := range []Key{{1}, {3}, {4}} { 750 r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 751 if err != nil { 752 continue 753 } 754 v, err := rowToValues(r) 755 if err != nil { 756 continue 757 } 758 got = append(got, v) 759 rts, err = ro.Timestamp() 760 if err != nil { 761 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 762 } 763 if err := test.checkTs(rts); err != nil { 764 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 765 } 766 if roTs != rts { 767 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 768 } 769 } 770 if !testEqual(got, test.want) { 771 t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want) 772 } 773 // SingleUse.ReadUsingIndex 774 got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 775 if err != nil { 776 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err) 777 } 778 // The results from ReadUsingIndex is sorted by the index rather than 779 // primary key. 780 if len(got) != len(test.want) { 781 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 782 } 783 for j, g := range got { 784 if j > 0 { 785 prev := got[j-1][1].(string) + got[j-1][2].(string) 786 curr := got[j][1].(string) + got[j][2].(string) 787 if strings.Compare(prev, curr) > 0 { 788 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 789 } 790 } 791 found := false 792 for _, w := range test.want { 793 if testEqual(g, w) { 794 found = true 795 } 796 } 797 if !found { 798 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 799 break 800 } 801 } 802 rts, err = ro.Timestamp() 803 if err != nil { 804 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 805 } 806 if err := test.checkTs(rts); err != nil { 807 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 808 } 809 if roTs != rts { 810 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 811 } 812 // ReadOnlyTransaction.ReadRowUsingIndex 813 got = nil 814 for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} { 815 r, err := ro.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"}) 816 if err != nil { 817 continue 818 } 819 v, err := rowToValues(r) 820 if err != nil { 821 continue 822 } 823 got = append(got, v) 824 rts, err = ro.Timestamp() 825 if err != nil { 826 t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err) 827 } 828 if err := test.checkTs(rts); err != nil { 829 t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err) 830 } 831 if roTs != rts { 832 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 833 } 834 } 835 if !testEqual(got, test.want) { 836 t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRowUsingIndex: %v, want %v", i, got, test.want) 837 } 838 ro.Close() 839 } 840} 841 842// Test ReadOnlyTransaction with different timestamp bound when there's an 843// update at the same time. 844func TestIntegration_UpdateDuringRead(t *testing.T) { 845 t.Parallel() 846 847 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 848 defer cancel() 849 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 850 defer cleanup() 851 852 for i, tb := range []TimestampBound{ 853 StrongRead(), 854 ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour 855 ExactStaleness(time.Minute * 30), 856 } { 857 ro := client.ReadOnlyTransaction().WithTimestampBound(tb) 858 _, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 859 if ErrCode(err) != codes.NotFound { 860 t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err) 861 } 862 863 m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i}) 864 if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 865 t.Fatal(err) 866 } 867 868 _, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 869 if ErrCode(err) != codes.NotFound { 870 t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err) 871 } 872 } 873} 874 875// Test ReadWriteTransaction. 876func TestIntegration_ReadWriteTransaction(t *testing.T) { 877 t.Parallel() 878 879 // Give a longer deadline because of transaction backoffs. 880 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 881 defer cancel() 882 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 883 defer cleanup() 884 885 // Set up two accounts 886 accounts := []*Mutation{ 887 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 888 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 889 } 890 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 891 t.Fatal(err) 892 } 893 wg := sync.WaitGroup{} 894 895 readBalance := func(iter *RowIterator) (int64, error) { 896 defer iter.Stop() 897 var bal int64 898 for { 899 row, err := iter.Next() 900 if err == iterator.Done { 901 return bal, nil 902 } 903 if err != nil { 904 return 0, err 905 } 906 if err := row.Column(0, &bal); err != nil { 907 return 0, err 908 } 909 } 910 } 911 912 for i := 0; i < 20; i++ { 913 wg.Add(1) 914 go func(iter int) { 915 defer wg.Done() 916 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 917 // Query Foo's balance and Bar's balance. 918 bf, e := readBalance(tx.Query(ctx, 919 Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}})) 920 if e != nil { 921 return e 922 } 923 bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"})) 924 if e != nil { 925 return e 926 } 927 if bf <= 0 { 928 return nil 929 } 930 bf-- 931 bb++ 932 return tx.BufferWrite([]*Mutation{ 933 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}), 934 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}), 935 }) 936 }) 937 if err != nil { 938 t.Errorf("%d: failed to execute transaction: %v", iter, err) 939 } 940 }(i) 941 } 942 // Because of context timeout, all goroutines will eventually return. 943 wg.Wait() 944 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 945 var bf, bb int64 946 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"}) 947 if e != nil { 948 return e 949 } 950 if ce := r.Column(0, &bf); ce != nil { 951 return ce 952 } 953 bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"})) 954 if e != nil { 955 return e 956 } 957 if bf != 30 || bb != 21 { 958 t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21) 959 } 960 return nil 961 }) 962 if err != nil { 963 t.Errorf("failed to check balances: %v", err) 964 } 965} 966 967func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) { 968 t.Parallel() 969 970 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 971 defer cancel() 972 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 973 defer cleanup() 974 975 // Set up two accounts 976 accounts := []*Mutation{ 977 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 978 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 979 } 980 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 981 t.Fatal(err) 982 } 983 984 getBalance := func(txn *ReadWriteStmtBasedTransaction, key Key) (int64, error) { 985 row, err := txn.ReadRow(ctx, "Accounts", key, []string{"Balance"}) 986 if err != nil { 987 return 0, err 988 } 989 var balance int64 990 if err := row.Column(0, &balance); err != nil { 991 return 0, err 992 } 993 return balance, nil 994 } 995 996 statements := func(txn *ReadWriteStmtBasedTransaction) error { 997 outBalance, err := getBalance(txn, Key{1}) 998 if err != nil { 999 return err 1000 } 1001 const transferAmt = 20 1002 if outBalance >= transferAmt { 1003 inBalance, err := getBalance(txn, Key{2}) 1004 if err != nil { 1005 return err 1006 } 1007 inBalance += transferAmt 1008 outBalance -= transferAmt 1009 cols := []string{"AccountId", "Balance"} 1010 txn.BufferWrite([]*Mutation{ 1011 Update("Accounts", cols, []interface{}{1, outBalance}), 1012 Update("Accounts", cols, []interface{}{2, inBalance}), 1013 }) 1014 } 1015 return nil 1016 } 1017 1018 for { 1019 tx, err := NewReadWriteStmtBasedTransaction(ctx, client) 1020 if err != nil { 1021 t.Fatalf("failed to begin a transaction: %v", err) 1022 } 1023 err = statements(tx) 1024 if err != nil && status.Code(err) != codes.Aborted { 1025 tx.Rollback(ctx) 1026 t.Fatalf("failed to execute statements: %v", err) 1027 } else if err == nil { 1028 _, err = tx.Commit(ctx) 1029 if err == nil { 1030 break 1031 } else if status.Code(err) != codes.Aborted { 1032 t.Fatalf("failed to commit a transaction: %v", err) 1033 } 1034 } 1035 // Set a default sleep time if the server delay is absent. 1036 delay := 10 * time.Millisecond 1037 if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay { 1038 delay = serverDelay 1039 } 1040 time.Sleep(delay) 1041 } 1042 1043 // Query the updated values. 1044 stmt := Statement{SQL: `SELECT AccountId, Nickname, Balance FROM Accounts ORDER BY AccountId`} 1045 iter := client.Single().Query(ctx, stmt) 1046 got, err := readAllAccountsTable(iter) 1047 if err != nil { 1048 t.Fatalf("failed to read data: %v", err) 1049 } 1050 want := [][]interface{}{ 1051 {int64(1), "Foo", int64(30)}, 1052 {int64(2), "Bar", int64(21)}, 1053 } 1054 if !testEqual(got, want) { 1055 t.Errorf("\ngot %v\nwant%v", got, want) 1056 } 1057} 1058 1059func TestIntegration_Reads(t *testing.T) { 1060 t.Parallel() 1061 1062 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1063 defer cancel() 1064 // Set up testing environment. 1065 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 1066 defer cleanup() 1067 1068 // Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2". 1069 var ms []*Mutation 1070 for i := 0; i < 15; i++ { 1071 ms = append(ms, InsertOrUpdate(testTable, 1072 testTableColumns, 1073 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 1074 } 1075 // Don't use ApplyAtLeastOnce, so we can test the other code path. 1076 if _, err := client.Apply(ctx, ms); err != nil { 1077 t.Fatal(err) 1078 } 1079 1080 // Empty read. 1081 rows, err := readAllTestTable(client.Single().Read(ctx, testTable, 1082 KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns)) 1083 if err != nil { 1084 t.Fatal(err) 1085 } 1086 if got, want := len(rows), 0; got != want { 1087 t.Errorf("got %d, want %d", got, want) 1088 } 1089 1090 // Index empty read. 1091 rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, 1092 KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns)) 1093 if err != nil { 1094 t.Fatal(err) 1095 } 1096 if got, want := len(rows), 0; got != want { 1097 t.Errorf("got %d, want %d", got, want) 1098 } 1099 1100 // Point read. 1101 row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns) 1102 if err != nil { 1103 t.Fatal(err) 1104 } 1105 var got testTableRow 1106 if err := row.ToStruct(&got); err != nil { 1107 t.Fatal(err) 1108 } 1109 if want := (testTableRow{"k1", "v1"}); got != want { 1110 t.Errorf("got %v, want %v", got, want) 1111 } 1112 1113 // Point read not found. 1114 _, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns) 1115 if ErrCode(err) != codes.NotFound { 1116 t.Fatalf("got %v, want NotFound", err) 1117 } 1118 1119 // Index point read. 1120 rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns) 1121 if err != nil { 1122 t.Fatal(err) 1123 } 1124 var gotIndex testTableRow 1125 if err := rowIndex.ToStruct(&gotIndex); err != nil { 1126 t.Fatal(err) 1127 } 1128 if wantIndex := (testTableRow{"k1", "v1"}); gotIndex != wantIndex { 1129 t.Errorf("got %v, want %v", gotIndex, wantIndex) 1130 } 1131 // Index point read not found. 1132 _, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v999"}, testTableColumns) 1133 if ErrCode(err) != codes.NotFound { 1134 t.Fatalf("got %v, want NotFound", err) 1135 } 1136 rangeReads(ctx, t, client) 1137 indexRangeReads(ctx, t, client) 1138} 1139 1140func TestIntegration_EarlyTimestamp(t *testing.T) { 1141 t.Parallel() 1142 1143 // Test that we can get the timestamp from a read-only transaction as 1144 // soon as we have read at least one row. 1145 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1146 defer cancel() 1147 // Set up testing environment. 1148 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 1149 defer cleanup() 1150 1151 var ms []*Mutation 1152 for i := 0; i < 3; i++ { 1153 ms = append(ms, InsertOrUpdate(testTable, 1154 testTableColumns, 1155 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 1156 } 1157 if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil { 1158 t.Fatal(err) 1159 } 1160 1161 txn := client.Single() 1162 iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns) 1163 defer iter.Stop() 1164 // In single-use transaction, we should get an error before reading anything. 1165 if _, err := txn.Timestamp(); err == nil { 1166 t.Error("wanted error, got nil") 1167 } 1168 // After reading one row, the timestamp should be available. 1169 _, err := iter.Next() 1170 if err != nil { 1171 t.Fatal(err) 1172 } 1173 if _, err := txn.Timestamp(); err != nil { 1174 t.Errorf("got %v, want nil", err) 1175 } 1176 1177 txn = client.ReadOnlyTransaction() 1178 defer txn.Close() 1179 iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns) 1180 defer iter.Stop() 1181 // In an ordinary read-only transaction, the timestamp should be 1182 // available immediately. 1183 if _, err := txn.Timestamp(); err != nil { 1184 t.Errorf("got %v, want nil", err) 1185 } 1186} 1187 1188func TestIntegration_NestedTransaction(t *testing.T) { 1189 t.Parallel() 1190 1191 // You cannot use a transaction from inside a read-write transaction. 1192 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1193 defer cancel() 1194 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1195 defer cleanup() 1196 1197 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1198 _, err := client.ReadWriteTransaction(ctx, 1199 func(context.Context, *ReadWriteTransaction) error { return nil }) 1200 if ErrCode(err) != codes.FailedPrecondition { 1201 t.Fatalf("got %v, want FailedPrecondition", err) 1202 } 1203 _, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 1204 if ErrCode(err) != codes.FailedPrecondition { 1205 t.Fatalf("got %v, want FailedPrecondition", err) 1206 } 1207 rot := client.ReadOnlyTransaction() 1208 defer rot.Close() 1209 _, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 1210 if ErrCode(err) != codes.FailedPrecondition { 1211 t.Fatalf("got %v, want FailedPrecondition", err) 1212 } 1213 return nil 1214 }) 1215 if err != nil { 1216 t.Fatal(err) 1217 } 1218} 1219 1220func TestIntegration_CreateDBRetry(t *testing.T) { 1221 t.Parallel() 1222 1223 if databaseAdmin == nil { 1224 t.Skip("Integration tests skipped") 1225 } 1226 skipEmulatorTest(t) 1227 1228 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1229 defer cancel() 1230 dbName := dbNameSpace.New() 1231 1232 // Simulate an Unavailable error on the CreateDatabase RPC. 1233 hasReturnedUnavailable := false 1234 interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1235 err := invoker(ctx, method, req, reply, cc, opts...) 1236 if !hasReturnedUnavailable && err == nil { 1237 hasReturnedUnavailable = true 1238 return status.Error(codes.Unavailable, "Injected unavailable error") 1239 } 1240 return err 1241 } 1242 1243 dbAdmin, err := database.NewDatabaseAdminClient(ctx, option.WithGRPCDialOption(grpc.WithUnaryInterceptor(interceptor))) 1244 if err != nil { 1245 log.Fatalf("cannot create dbAdmin client: %v", err) 1246 } 1247 op, err := dbAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{ 1248 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 1249 CreateStatement: "CREATE DATABASE " + dbName, 1250 }) 1251 if err != nil { 1252 t.Fatalf("failed to create database: %v", err) 1253 } 1254 _, err = op.Wait(ctx) 1255 if err != nil { 1256 t.Fatalf("failed to create database: %v", err) 1257 } 1258 if !hasReturnedUnavailable { 1259 t.Fatalf("interceptor did not return Unavailable") 1260 } 1261} 1262 1263// Test client recovery on database recreation. 1264func TestIntegration_DbRemovalRecovery(t *testing.T) { 1265 t.Parallel() 1266 1267 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1268 defer cancel() 1269 // Create a client with MinOpened=0 to prevent the session pool maintainer 1270 // from repeatedly trying to create sessions for the invalid database. 1271 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, SessionPoolConfig{}, singerDBStatements) 1272 defer cleanup() 1273 1274 // Drop the testing database. 1275 if err := databaseAdmin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil { 1276 t.Fatalf("failed to drop testing database %v: %v", dbPath, err) 1277 } 1278 1279 // Now, send the query. 1280 iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 1281 defer iter.Stop() 1282 if _, err := iter.Next(); err == nil { 1283 t.Errorf("client sends query to removed database successfully, want it to fail") 1284 } 1285 1286 // Recreate database and table. 1287 dbName := dbPath[strings.LastIndex(dbPath, "/")+1:] 1288 op, err := databaseAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{ 1289 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 1290 CreateStatement: "CREATE DATABASE " + dbName, 1291 ExtraStatements: []string{ 1292 `CREATE TABLE Singers ( 1293 SingerId INT64 NOT NULL, 1294 FirstName STRING(1024), 1295 LastName STRING(1024), 1296 SingerInfo BYTES(MAX) 1297 ) PRIMARY KEY (SingerId)`, 1298 }, 1299 }) 1300 if err != nil { 1301 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 1302 } 1303 if _, err := op.Wait(ctx); err != nil { 1304 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 1305 } 1306 1307 // Now, send the query again. 1308 iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 1309 defer iter.Stop() 1310 _, err = iter.Next() 1311 if err != nil && err != iterator.Done { 1312 t.Errorf("failed to send query to database %v: %v", dbPath, err) 1313 } 1314} 1315 1316// Test encoding/decoding non-struct Cloud Spanner types. 1317func TestIntegration_BasicTypes(t *testing.T) { 1318 t.Parallel() 1319 1320 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1321 defer cancel() 1322 stmts := singerDBStatements 1323 if !isEmulatorEnvSet() { 1324 stmts = []string{ 1325 `CREATE TABLE Singers ( 1326 SingerId INT64 NOT NULL, 1327 FirstName STRING(1024), 1328 LastName STRING(1024), 1329 SingerInfo BYTES(MAX) 1330 ) PRIMARY KEY (SingerId)`, 1331 `CREATE INDEX SingerByName ON Singers(FirstName, LastName)`, 1332 `CREATE TABLE Accounts ( 1333 AccountId INT64 NOT NULL, 1334 Nickname STRING(100), 1335 Balance INT64 NOT NULL, 1336 ) PRIMARY KEY (AccountId)`, 1337 `CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`, 1338 `CREATE TABLE Types ( 1339 RowID INT64 NOT NULL, 1340 String STRING(MAX), 1341 StringArray ARRAY<STRING(MAX)>, 1342 Bytes BYTES(MAX), 1343 BytesArray ARRAY<BYTES(MAX)>, 1344 Int64a INT64, 1345 Int64Array ARRAY<INT64>, 1346 Bool BOOL, 1347 BoolArray ARRAY<BOOL>, 1348 Float64 FLOAT64, 1349 Float64Array ARRAY<FLOAT64>, 1350 Date DATE, 1351 DateArray ARRAY<DATE>, 1352 Timestamp TIMESTAMP, 1353 TimestampArray ARRAY<TIMESTAMP>, 1354 Numeric NUMERIC, 1355 NumericArray ARRAY<NUMERIC> 1356 ) PRIMARY KEY (RowID)`, 1357 } 1358 } 1359 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, stmts) 1360 defer cleanup() 1361 1362 t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z") 1363 // Boundaries 1364 t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z") 1365 t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z") 1366 d1, _ := civil.ParseDate("2016-11-15") 1367 // Boundaries 1368 d2, _ := civil.ParseDate("0001-01-01") 1369 d3, _ := civil.ParseDate("9999-12-31") 1370 1371 n0 := big.Rat{} 1372 n1p, _ := (&big.Rat{}).SetString("123456789") 1373 n2p, _ := (&big.Rat{}).SetString("123456789/1000000000") 1374 n1 := *n1p 1375 n2 := *n2p 1376 1377 tests := []struct { 1378 col string 1379 val interface{} 1380 want interface{} 1381 }{ 1382 {col: "String", val: ""}, 1383 {col: "String", val: "", want: NullString{"", true}}, 1384 {col: "String", val: "foo"}, 1385 {col: "String", val: "foo", want: NullString{"foo", true}}, 1386 {col: "String", val: NullString{"bar", true}, want: "bar"}, 1387 {col: "String", val: NullString{"bar", false}, want: NullString{"", false}}, 1388 {col: "String", val: nil, want: NullString{}}, 1389 {col: "StringArray", val: []string(nil), want: []NullString(nil)}, 1390 {col: "StringArray", val: []string{}, want: []NullString{}}, 1391 {col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}}, 1392 {col: "StringArray", val: []NullString(nil)}, 1393 {col: "StringArray", val: []NullString{}}, 1394 {col: "StringArray", val: []NullString{{"foo", true}, {}}}, 1395 {col: "Bytes", val: []byte{}}, 1396 {col: "Bytes", val: []byte{1, 2, 3}}, 1397 {col: "Bytes", val: []byte(nil)}, 1398 {col: "BytesArray", val: [][]byte(nil)}, 1399 {col: "BytesArray", val: [][]byte{}}, 1400 {col: "BytesArray", val: [][]byte{{1}, {2, 3}}}, 1401 {col: "Int64a", val: 0, want: int64(0)}, 1402 {col: "Int64a", val: -1, want: int64(-1)}, 1403 {col: "Int64a", val: 2, want: int64(2)}, 1404 {col: "Int64a", val: int64(3)}, 1405 {col: "Int64a", val: 4, want: NullInt64{4, true}}, 1406 {col: "Int64a", val: NullInt64{5, true}, want: int64(5)}, 1407 {col: "Int64a", val: NullInt64{6, true}, want: int64(6)}, 1408 {col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}}, 1409 {col: "Int64a", val: nil, want: NullInt64{}}, 1410 {col: "Int64Array", val: []int(nil), want: []NullInt64(nil)}, 1411 {col: "Int64Array", val: []int{}, want: []NullInt64{}}, 1412 {col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 1413 {col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)}, 1414 {col: "Int64Array", val: []int64{}, want: []NullInt64{}}, 1415 {col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 1416 {col: "Int64Array", val: []NullInt64(nil)}, 1417 {col: "Int64Array", val: []NullInt64{}}, 1418 {col: "Int64Array", val: []NullInt64{{1, true}, {}}}, 1419 {col: "Bool", val: false}, 1420 {col: "Bool", val: true}, 1421 {col: "Bool", val: false, want: NullBool{false, true}}, 1422 {col: "Bool", val: true, want: NullBool{true, true}}, 1423 {col: "Bool", val: NullBool{true, true}}, 1424 {col: "Bool", val: NullBool{false, false}}, 1425 {col: "Bool", val: nil, want: NullBool{}}, 1426 {col: "BoolArray", val: []bool(nil), want: []NullBool(nil)}, 1427 {col: "BoolArray", val: []bool{}, want: []NullBool{}}, 1428 {col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}}, 1429 {col: "BoolArray", val: []NullBool(nil)}, 1430 {col: "BoolArray", val: []NullBool{}}, 1431 {col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}}, 1432 {col: "Float64", val: 0.0}, 1433 {col: "Float64", val: 3.14}, 1434 {col: "Float64", val: math.NaN()}, 1435 {col: "Float64", val: math.Inf(1)}, 1436 {col: "Float64", val: math.Inf(-1)}, 1437 {col: "Float64", val: 2.78, want: NullFloat64{2.78, true}}, 1438 {col: "Float64", val: NullFloat64{2.71, true}, want: 2.71}, 1439 {col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}}, 1440 {col: "Float64", val: NullFloat64{0, false}}, 1441 {col: "Float64", val: nil, want: NullFloat64{}}, 1442 {col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)}, 1443 {col: "Float64Array", val: []float64{}, want: []NullFloat64{}}, 1444 {col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}}, 1445 {col: "Float64Array", val: []NullFloat64(nil)}, 1446 {col: "Float64Array", val: []NullFloat64{}}, 1447 {col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}}, 1448 {col: "Date", val: d1}, 1449 {col: "Date", val: d1, want: NullDate{d1, true}}, 1450 {col: "Date", val: NullDate{d1, true}}, 1451 {col: "Date", val: NullDate{d1, true}, want: d1}, 1452 {col: "Date", val: NullDate{civil.Date{}, false}}, 1453 {col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)}, 1454 {col: "DateArray", val: []civil.Date{}, want: []NullDate{}}, 1455 {col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}}, 1456 {col: "Timestamp", val: t1}, 1457 {col: "Timestamp", val: t1, want: NullTime{t1, true}}, 1458 {col: "Timestamp", val: NullTime{t1, true}}, 1459 {col: "Timestamp", val: NullTime{t1, true}, want: t1}, 1460 {col: "Timestamp", val: NullTime{}}, 1461 {col: "Timestamp", val: nil, want: NullTime{}}, 1462 {col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)}, 1463 {col: "TimestampArray", val: []time.Time{}, want: []NullTime{}}, 1464 {col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}}, 1465 } 1466 1467 if !isEmulatorEnvSet() { 1468 for _, tc := range []struct { 1469 col string 1470 val interface{} 1471 want interface{} 1472 }{ 1473 {col: "Numeric", val: n1}, 1474 {col: "Numeric", val: n2}, 1475 {col: "Numeric", val: n1, want: NullNumeric{n1, true}}, 1476 {col: "Numeric", val: n2, want: NullNumeric{n2, true}}, 1477 {col: "Numeric", val: NullNumeric{n1, true}, want: n1}, 1478 {col: "Numeric", val: NullNumeric{n1, true}, want: NullNumeric{n1, true}}, 1479 {col: "Numeric", val: NullNumeric{n0, false}}, 1480 {col: "Numeric", val: nil, want: NullNumeric{}}, 1481 {col: "NumericArray", val: []big.Rat(nil), want: []NullNumeric(nil)}, 1482 {col: "NumericArray", val: []big.Rat{}, want: []NullNumeric{}}, 1483 {col: "NumericArray", val: []big.Rat{n1, n2}, want: []NullNumeric{{n1, true}, {n2, true}}}, 1484 {col: "NumericArray", val: []NullNumeric(nil)}, 1485 {col: "NumericArray", val: []NullNumeric{}}, 1486 {col: "NumericArray", val: []NullNumeric{{n1, true}, {n2, true}, {}}}, 1487 } { 1488 tests = append(tests, tc) 1489 } 1490 } 1491 1492 // Write rows into table first. 1493 var muts []*Mutation 1494 for i, test := range tests { 1495 muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val})) 1496 } 1497 if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil { 1498 t.Fatal(err) 1499 } 1500 1501 for i, test := range tests { 1502 row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col}) 1503 if err != nil { 1504 t.Fatalf("Unable to fetch row %v: %v", i, err) 1505 } 1506 // Create new instance of type of test.want. 1507 want := test.want 1508 if want == nil { 1509 want = test.val 1510 } 1511 gotp := reflect.New(reflect.TypeOf(want)) 1512 if err := row.Column(0, gotp.Interface()); err != nil { 1513 t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err) 1514 continue 1515 } 1516 got := reflect.Indirect(gotp).Interface() 1517 1518 // One of the test cases is checking NaN handling. Given 1519 // NaN!=NaN, we can't use reflect to test for it. 1520 if isNaN(got) && isNaN(want) { 1521 continue 1522 } 1523 1524 // Check non-NaN cases. 1525 if !testEqual(got, want) { 1526 t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want) 1527 continue 1528 } 1529 } 1530} 1531 1532// Test decoding Cloud Spanner STRUCT type. 1533func TestIntegration_StructTypes(t *testing.T) { 1534 t.Parallel() 1535 1536 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1537 defer cancel() 1538 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1539 defer cleanup() 1540 1541 tests := []struct { 1542 q Statement 1543 want func(r *Row) error 1544 }{ 1545 { 1546 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`}, 1547 want: func(r *Row) error { 1548 // Test STRUCT ARRAY decoding to []NullRow. 1549 var rows []NullRow 1550 if err := r.Column(0, &rows); err != nil { 1551 return err 1552 } 1553 if len(rows) != 1 { 1554 return fmt.Errorf("len(rows) = %d; want 1", len(rows)) 1555 } 1556 if !rows[0].Valid { 1557 return fmt.Errorf("rows[0] is NULL") 1558 } 1559 var i, j int64 1560 if err := rows[0].Row.Columns(&i, &j); err != nil { 1561 return err 1562 } 1563 if i != 1 || j != 2 { 1564 return fmt.Errorf("got (%d,%d), want (1,2)", i, j) 1565 } 1566 return nil 1567 }, 1568 }, 1569 { 1570 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`}, 1571 want: func(r *Row) error { 1572 // Test Row.ToStruct. 1573 s := struct { 1574 Col1 []*struct { 1575 Foo int64 `spanner:"foo"` 1576 Bar int64 `spanner:"bar"` 1577 } `spanner:"col1"` 1578 }{} 1579 if err := r.ToStruct(&s); err != nil { 1580 return err 1581 } 1582 want := struct { 1583 Col1 []*struct { 1584 Foo int64 `spanner:"foo"` 1585 Bar int64 `spanner:"bar"` 1586 } `spanner:"col1"` 1587 }{ 1588 Col1: []*struct { 1589 Foo int64 `spanner:"foo"` 1590 Bar int64 `spanner:"bar"` 1591 }{ 1592 { 1593 Foo: 1, 1594 Bar: 2, 1595 }, 1596 }, 1597 } 1598 if !testEqual(want, s) { 1599 return fmt.Errorf("unexpected decoding result: %v, want %v", s, want) 1600 } 1601 return nil 1602 }, 1603 }, 1604 } 1605 for i, test := range tests { 1606 iter := client.Single().Query(ctx, test.q) 1607 defer iter.Stop() 1608 row, err := iter.Next() 1609 if err != nil { 1610 t.Errorf("%d: %v", i, err) 1611 continue 1612 } 1613 if err := test.want(row); err != nil { 1614 t.Errorf("%d: %v", i, err) 1615 continue 1616 } 1617 } 1618} 1619 1620func TestIntegration_StructParametersUnsupported(t *testing.T) { 1621 skipEmulatorTest(t) 1622 t.Parallel() 1623 1624 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1625 defer cancel() 1626 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 1627 defer cleanup() 1628 1629 for _, test := range []struct { 1630 param interface{} 1631 wantCode codes.Code 1632 wantMsgPart string 1633 }{ 1634 { 1635 struct { 1636 Field int 1637 }{10}, 1638 codes.Unimplemented, 1639 "Unsupported query shape: " + 1640 "A struct value cannot be returned as a column value. " + 1641 "Rewrite the query to flatten the struct fields in the result.", 1642 }, 1643 { 1644 []struct { 1645 Field int 1646 }{{10}, {20}}, 1647 codes.Unimplemented, 1648 "Unsupported query shape: " + 1649 "This query can return a null-valued array of struct, " + 1650 "which is not supported by Spanner.", 1651 }, 1652 } { 1653 iter := client.Single().Query(ctx, Statement{ 1654 SQL: "SELECT @p", 1655 Params: map[string]interface{}{"p": test.param}, 1656 }) 1657 _, err := iter.Next() 1658 iter.Stop() 1659 if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok { 1660 t.Fatal(msg) 1661 } 1662 } 1663} 1664 1665// Test queries of the form "SELECT expr". 1666func TestIntegration_QueryExpressions(t *testing.T) { 1667 t.Parallel() 1668 1669 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1670 defer cancel() 1671 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 1672 defer cleanup() 1673 1674 newRow := func(vals []interface{}) *Row { 1675 row, err := NewRow(make([]string, len(vals)), vals) 1676 if err != nil { 1677 t.Fatal(err) 1678 } 1679 return row 1680 } 1681 1682 tests := []struct { 1683 expr string 1684 want interface{} 1685 }{ 1686 {"1", int64(1)}, 1687 {"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}}, 1688 {"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}}, 1689 {"IEEE_DIVIDE(1, 0)", math.Inf(1)}, 1690 {"IEEE_DIVIDE(-1, 0)", math.Inf(-1)}, 1691 {"IEEE_DIVIDE(0, 0)", math.NaN()}, 1692 // TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate. 1693 {"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}}, 1694 {"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}}, 1695 {"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}}, 1696 } 1697 for _, test := range tests { 1698 iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr}) 1699 defer iter.Stop() 1700 row, err := iter.Next() 1701 if err != nil { 1702 t.Errorf("%q: %v", test.expr, err) 1703 continue 1704 } 1705 // Create new instance of type of test.want. 1706 gotp := reflect.New(reflect.TypeOf(test.want)) 1707 if err := row.Column(0, gotp.Interface()); err != nil { 1708 t.Errorf("%q: Column returned error %v", test.expr, err) 1709 continue 1710 } 1711 got := reflect.Indirect(gotp).Interface() 1712 // TODO(jba): remove isNaN special case when we have a better equality predicate. 1713 if isNaN(got) && isNaN(test.want) { 1714 continue 1715 } 1716 if !testEqual(got, test.want) { 1717 t.Errorf("%q\n got %#v\nwant %#v", test.expr, got, test.want) 1718 } 1719 } 1720} 1721 1722func TestIntegration_QueryStats(t *testing.T) { 1723 skipEmulatorTest(t) 1724 t.Parallel() 1725 1726 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1727 defer cancel() 1728 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1729 defer cleanup() 1730 1731 accounts := []*Mutation{ 1732 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1733 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1734 } 1735 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1736 t.Fatal(err) 1737 } 1738 const sql = "SELECT Balance FROM Accounts" 1739 1740 qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil}) 1741 if err != nil { 1742 t.Fatal(err) 1743 } 1744 if len(qp.PlanNodes) == 0 { 1745 t.Error("got zero plan nodes, expected at least one") 1746 } 1747 1748 iter := client.Single().QueryWithStats(ctx, Statement{sql, nil}) 1749 defer iter.Stop() 1750 for { 1751 _, err := iter.Next() 1752 if err == iterator.Done { 1753 break 1754 } 1755 if err != nil { 1756 t.Fatal(err) 1757 } 1758 } 1759 if iter.QueryPlan == nil { 1760 t.Error("got nil QueryPlan, expected one") 1761 } 1762 if iter.QueryStats == nil { 1763 t.Error("got nil QueryStats, expected some") 1764 } 1765} 1766 1767func TestIntegration_InvalidDatabase(t *testing.T) { 1768 t.Parallel() 1769 1770 if databaseAdmin == nil { 1771 t.Skip("Integration tests skipped") 1772 } 1773 ctx := context.Background() 1774 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID) 1775 c, err := createClient(ctx, dbPath, SessionPoolConfig{}) 1776 // Client creation should succeed even if the database is invalid. 1777 if err != nil { 1778 t.Fatal(err) 1779 } 1780 _, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"}) 1781 if msg, ok := matchError(err, codes.NotFound, ""); !ok { 1782 t.Fatal(msg) 1783 } 1784} 1785 1786func TestIntegration_ReadErrors(t *testing.T) { 1787 t.Parallel() 1788 1789 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1790 defer cancel() 1791 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 1792 defer cleanup() 1793 1794 var ms []*Mutation 1795 for i := 0; i < 2; i++ { 1796 ms = append(ms, InsertOrUpdate(testTable, 1797 testTableColumns, 1798 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v")})) 1799 } 1800 if _, err := client.Apply(ctx, ms); err != nil { 1801 t.Fatal(err) 1802 } 1803 1804 // Read over invalid table fails 1805 _, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"}) 1806 if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok { 1807 t.Error(msg) 1808 } 1809 // Read over invalid column fails 1810 _, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"}) 1811 if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok { 1812 t.Error(msg) 1813 } 1814 1815 // Invalid query fails 1816 iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"}) 1817 defer iter.Stop() 1818 _, err = iter.Next() 1819 if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok { 1820 t.Error(msg) 1821 } 1822 1823 // Read should fail on cancellation. 1824 cctx, cancel := context.WithCancel(ctx) 1825 cancel() 1826 _, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"}) 1827 if msg, ok := matchError(err, codes.Canceled, ""); !ok { 1828 t.Error(msg) 1829 } 1830 // Read should fail if deadline exceeded. 1831 dctx, cancel := context.WithTimeout(ctx, time.Nanosecond) 1832 defer cancel() 1833 <-dctx.Done() 1834 _, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"}) 1835 if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok { 1836 t.Error(msg) 1837 } 1838 // Read should fail if there are multiple rows returned. 1839 _, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v"}, testTableColumns) 1840 wantMsgPart := fmt.Sprintf("more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", testTable, Key{"v"}, testTableIndex) 1841 if msg, ok := matchError(err, codes.FailedPrecondition, wantMsgPart); !ok { 1842 t.Error(msg) 1843 } 1844} 1845 1846// Test TransactionRunner. Test that transactions are aborted and retried as 1847// expected. 1848func TestIntegration_TransactionRunner(t *testing.T) { 1849 skipEmulatorTest(t) 1850 t.Parallel() 1851 1852 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1853 defer cancel() 1854 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1855 defer cleanup() 1856 1857 // Test 1: User error should abort the transaction. 1858 _, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1859 tx.BufferWrite([]*Mutation{ 1860 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})}) 1861 return errors.New("user error") 1862 }) 1863 // Empty read. 1864 rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"})) 1865 if err != nil { 1866 t.Fatal(err) 1867 } 1868 if got, want := len(rows), 0; got != want { 1869 t.Errorf("Empty read, got %d, want %d.", got, want) 1870 } 1871 1872 // Test 2: Expect abort and retry. 1873 // We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by 1874 // committing writes to the column txn2 have read, and expect the following 1875 // read to abort and txn2 retries. 1876 1877 // Set up two accounts 1878 accounts := []*Mutation{ 1879 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}), 1880 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}), 1881 } 1882 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1883 t.Fatal(err) 1884 } 1885 1886 var ( 1887 cTxn1Start = make(chan struct{}) 1888 cTxn1Commit = make(chan struct{}) 1889 cTxn2Start = make(chan struct{}) 1890 wg sync.WaitGroup 1891 ) 1892 1893 // read balance, check error if we don't expect abort. 1894 readBalance := func(tx interface { 1895 ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) 1896 }, key int64, expectAbort bool) (int64, error) { 1897 var b int64 1898 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"}) 1899 if e != nil { 1900 if expectAbort && !isAbortedErr(e) { 1901 t.Errorf("ReadRow got %v, want Abort error.", e) 1902 } 1903 // Verify that we received and are able to extract retry info from 1904 // the aborted error. 1905 if _, hasRetryInfo := ExtractRetryDelay(e); !hasRetryInfo { 1906 t.Errorf("Got Abort error without RetryInfo\nGot: %v", e) 1907 } 1908 return b, e 1909 } 1910 if ce := r.Column(0, &b); ce != nil { 1911 return b, ce 1912 } 1913 return b, nil 1914 } 1915 1916 wg.Add(2) 1917 // Txn 1 1918 go func() { 1919 defer wg.Done() 1920 var once sync.Once 1921 _, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1922 b, e := readBalance(tx, 1, false) 1923 if e != nil { 1924 return e 1925 } 1926 // txn 1 can abort, in that case we skip closing the channel on 1927 // retry. 1928 once.Do(func() { close(cTxn1Start) }) 1929 e = tx.BufferWrite([]*Mutation{ 1930 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})}) 1931 if e != nil { 1932 return e 1933 } 1934 // Wait for second transaction. 1935 <-cTxn2Start 1936 return nil 1937 }) 1938 close(cTxn1Commit) 1939 if e != nil { 1940 t.Errorf("Transaction 1 commit, got %v, want nil.", e) 1941 } 1942 }() 1943 // Txn 2 1944 go func() { 1945 // Wait until txn 1 starts. 1946 <-cTxn1Start 1947 defer wg.Done() 1948 var ( 1949 once sync.Once 1950 b1 int64 1951 b2 int64 1952 e error 1953 ) 1954 _, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1955 if b1, e = readBalance(tx, 1, false); e != nil { 1956 return e 1957 } 1958 // Skip closing channel on retry. 1959 once.Do(func() { close(cTxn2Start) }) 1960 // Wait until txn 1 successfully commits. 1961 <-cTxn1Commit 1962 // Txn1 has committed and written a balance to the account. Now this 1963 // transaction (txn2) reads and re-writes the balance. The first 1964 // time through, it will abort because it overlaps with txn1. Then 1965 // it will retry after txn1 commits, and succeed. 1966 if b2, e = readBalance(tx, 2, true); e != nil { 1967 return e 1968 } 1969 return tx.BufferWrite([]*Mutation{ 1970 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})}) 1971 }) 1972 if e != nil { 1973 t.Errorf("Transaction 2 commit, got %v, want nil.", e) 1974 } 1975 }() 1976 wg.Wait() 1977 // Check that both transactions' effects are visible. 1978 for i := int64(1); i <= int64(2); i++ { 1979 if b, e := readBalance(client.Single(), i, false); e != nil { 1980 t.Fatalf("ReadBalance for key %d error %v.", i, e) 1981 } else if b != i { 1982 t.Errorf("Balance for key %d, got %d, want %d.", i, b, i) 1983 } 1984 } 1985} 1986 1987// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then 1988// serialize and deserialize both transaction and partition to be used in 1989// execution on another client, and compare results. 1990func TestIntegration_BatchQuery(t *testing.T) { 1991 t.Parallel() 1992 1993 // Set up testing environment. 1994 var ( 1995 client2 *Client 1996 err error 1997 ) 1998 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1999 defer cancel() 2000 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 2001 defer cleanup() 2002 2003 if err = populate(ctx, client); err != nil { 2004 t.Fatal(err) 2005 } 2006 if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil { 2007 t.Fatal(err) 2008 } 2009 defer client2.Close() 2010 2011 // PartitionQuery 2012 var ( 2013 txn *BatchReadOnlyTransaction 2014 partitions []*Partition 2015 stmt = Statement{SQL: "SELECT * FROM test;"} 2016 ) 2017 2018 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 2019 t.Fatal(err) 2020 } 2021 defer txn.Cleanup(ctx) 2022 if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil { 2023 t.Fatal(err) 2024 } 2025 2026 // Reconstruct BatchReadOnlyTransactionID and execute partitions 2027 var ( 2028 tid2 BatchReadOnlyTransactionID 2029 data []byte 2030 gotResult bool // if we get matching result from two separate txns 2031 ) 2032 if data, err = txn.ID.MarshalBinary(); err != nil { 2033 t.Fatalf("encoding failed %v", err) 2034 } 2035 if err = tid2.UnmarshalBinary(data); err != nil { 2036 t.Fatalf("decoding failed %v", err) 2037 } 2038 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 2039 2040 // Execute Partitions and compare results 2041 for i, p := range partitions { 2042 iter := txn.Execute(ctx, p) 2043 defer iter.Stop() 2044 p2 := serdesPartition(t, i, p) 2045 iter2 := txn2.Execute(ctx, &p2) 2046 defer iter2.Stop() 2047 2048 row1, err1 := iter.Next() 2049 row2, err2 := iter2.Next() 2050 if err1 != err2 { 2051 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 2052 continue 2053 } 2054 if !testEqual(row1, row2) { 2055 t.Fatalf("execution returned different values: %v, %v", row1, row2) 2056 continue 2057 } 2058 if row1 == nil { 2059 continue 2060 } 2061 var a, b string 2062 if err = row1.Columns(&a, &b); err != nil { 2063 t.Fatalf("failed to parse row %v", err) 2064 continue 2065 } 2066 if a == str1 && b == str2 { 2067 gotResult = true 2068 } 2069 } 2070 if !gotResult { 2071 t.Fatalf("execution didn't return expected values") 2072 } 2073} 2074 2075// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery 2076func TestIntegration_BatchRead(t *testing.T) { 2077 t.Parallel() 2078 2079 // Set up testing environment. 2080 var ( 2081 client2 *Client 2082 err error 2083 ) 2084 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2085 defer cancel() 2086 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 2087 defer cleanup() 2088 2089 if err = populate(ctx, client); err != nil { 2090 t.Fatal(err) 2091 } 2092 if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil { 2093 t.Fatal(err) 2094 } 2095 defer client2.Close() 2096 2097 // PartitionRead 2098 var ( 2099 txn *BatchReadOnlyTransaction 2100 partitions []*Partition 2101 ) 2102 2103 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 2104 t.Fatal(err) 2105 } 2106 defer txn.Cleanup(ctx) 2107 if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 2108 t.Fatal(err) 2109 } 2110 2111 // Reconstruct BatchReadOnlyTransactionID and execute partitions. 2112 var ( 2113 tid2 BatchReadOnlyTransactionID 2114 data []byte 2115 gotResult bool // if we get matching result from two separate txns 2116 ) 2117 if data, err = txn.ID.MarshalBinary(); err != nil { 2118 t.Fatalf("encoding failed %v", err) 2119 } 2120 if err = tid2.UnmarshalBinary(data); err != nil { 2121 t.Fatalf("decoding failed %v", err) 2122 } 2123 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 2124 2125 // Execute Partitions and compare results. 2126 for i, p := range partitions { 2127 iter := txn.Execute(ctx, p) 2128 defer iter.Stop() 2129 p2 := serdesPartition(t, i, p) 2130 iter2 := txn2.Execute(ctx, &p2) 2131 defer iter2.Stop() 2132 2133 row1, err1 := iter.Next() 2134 row2, err2 := iter2.Next() 2135 if err1 != err2 { 2136 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 2137 continue 2138 } 2139 if !testEqual(row1, row2) { 2140 t.Fatalf("execution returned different values: %v, %v", row1, row2) 2141 continue 2142 } 2143 if row1 == nil { 2144 continue 2145 } 2146 var a, b string 2147 if err = row1.Columns(&a, &b); err != nil { 2148 t.Fatalf("failed to parse row %v", err) 2149 continue 2150 } 2151 if a == str1 && b == str2 { 2152 gotResult = true 2153 } 2154 } 2155 if !gotResult { 2156 t.Fatalf("execution didn't return expected values") 2157 } 2158} 2159 2160// Test normal txReadEnv method on BatchReadOnlyTransaction. 2161func TestIntegration_BROTNormal(t *testing.T) { 2162 t.Parallel() 2163 2164 // Set up testing environment and create txn. 2165 var ( 2166 txn *BatchReadOnlyTransaction 2167 err error 2168 row *Row 2169 i int64 2170 ) 2171 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2172 defer cancel() 2173 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 2174 defer cleanup() 2175 2176 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 2177 t.Fatal(err) 2178 } 2179 defer txn.Cleanup(ctx) 2180 if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 2181 t.Fatal(err) 2182 } 2183 // Normal query should work with BatchReadOnlyTransaction. 2184 stmt2 := Statement{SQL: "SELECT 1"} 2185 iter := txn.Query(ctx, stmt2) 2186 defer iter.Stop() 2187 2188 row, err = iter.Next() 2189 if err != nil { 2190 t.Errorf("query failed with %v", err) 2191 } 2192 if err = row.Columns(&i); err != nil { 2193 t.Errorf("failed to parse row %v", err) 2194 } 2195} 2196 2197func TestIntegration_CommitTimestamp(t *testing.T) { 2198 t.Parallel() 2199 2200 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2201 defer cancel() 2202 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, ctsDBStatements) 2203 defer cleanup() 2204 2205 type testTableRow struct { 2206 Key string 2207 Ts NullTime 2208 } 2209 2210 var ( 2211 cts1, cts2, ts1, ts2 time.Time 2212 err error 2213 ) 2214 2215 // Apply mutation in sequence, expect to see commit timestamp in good order, 2216 // check also the commit timestamp returned 2217 for _, it := range []struct { 2218 k string 2219 t *time.Time 2220 }{ 2221 {"a", &cts1}, 2222 {"b", &cts2}, 2223 } { 2224 tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}} 2225 m, err := InsertStruct("TestTable", tt) 2226 if err != nil { 2227 t.Fatal(err) 2228 } 2229 *it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()) 2230 if err != nil { 2231 t.Fatal(err) 2232 } 2233 } 2234 2235 txn := client.ReadOnlyTransaction() 2236 for _, it := range []struct { 2237 k string 2238 t *time.Time 2239 }{ 2240 {"a", &ts1}, 2241 {"b", &ts2}, 2242 } { 2243 if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil { 2244 t.Fatal(err) 2245 } else { 2246 var got testTableRow 2247 if err := r.ToStruct(&got); err != nil { 2248 t.Fatal(err) 2249 } 2250 *it.t = got.Ts.Time 2251 } 2252 } 2253 if !cts1.Equal(ts1) { 2254 t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1) 2255 } 2256 if !cts2.Equal(ts2) { 2257 t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2) 2258 } 2259 2260 // Try writing a timestamp in the future to commit timestamp, expect error. 2261 _, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce()) 2262 if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok { 2263 t.Error(msg) 2264 } 2265} 2266 2267func TestIntegration_DML(t *testing.T) { 2268 t.Parallel() 2269 2270 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2271 defer cancel() 2272 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2273 defer cleanup() 2274 2275 // Function that reads a single row's first name from within a transaction. 2276 readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) { 2277 row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"}) 2278 if err != nil { 2279 return "", err 2280 } 2281 var fn string 2282 if err := row.Column(0, &fn); err != nil { 2283 return "", err 2284 } 2285 return fn, nil 2286 } 2287 2288 // Function that reads multiple rows' first names from outside a read/write 2289 // transaction. 2290 readFirstNames := func(keys ...int) []string { 2291 var ks []KeySet 2292 for _, k := range keys { 2293 ks = append(ks, Key{k}) 2294 } 2295 iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"}) 2296 var got []string 2297 var fn string 2298 err := iter.Do(func(row *Row) error { 2299 if err := row.Column(0, &fn); err != nil { 2300 return err 2301 } 2302 got = append(got, fn) 2303 return nil 2304 }) 2305 if err != nil { 2306 t.Fatalf("readFirstNames(%v): %v", keys, err) 2307 } 2308 return got 2309 } 2310 2311 // Use ReadWriteTransaction.Query to execute a DML statement. 2312 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2313 iter := tx.Query(ctx, Statement{ 2314 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`, 2315 }) 2316 defer iter.Stop() 2317 if row, err := iter.Next(); err != iterator.Done { 2318 t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err) 2319 } 2320 if iter.RowCount != 1 { 2321 t.Errorf("row count: got %d, want 1", iter.RowCount) 2322 } 2323 // The results of the DML statement should be visible to the transaction. 2324 got, err := readFirstName(tx, 1) 2325 if err != nil { 2326 return err 2327 } 2328 if want := "Umm"; got != want { 2329 t.Errorf("got %q, want %q", got, want) 2330 } 2331 return nil 2332 }) 2333 if err != nil { 2334 t.Fatal(err) 2335 } 2336 2337 // Use ReadWriteTransaction.Update to execute a DML statement. 2338 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2339 count, err := tx.Update(ctx, Statement{ 2340 SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`, 2341 }) 2342 if err != nil { 2343 return err 2344 } 2345 if count != 1 { 2346 t.Errorf("row count: got %d, want 1", count) 2347 } 2348 got, err := readFirstName(tx, 2) 2349 if err != nil { 2350 return err 2351 } 2352 if want := "Eduard"; got != want { 2353 t.Errorf("got %q, want %q", got, want) 2354 } 2355 return nil 2356 2357 }) 2358 if err != nil { 2359 t.Fatal(err) 2360 } 2361 2362 // Roll back a DML statement and confirm that it didn't happen. 2363 var fail = errors.New("fail") 2364 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2365 _, err := tx.Update(ctx, Statement{ 2366 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 2367 }) 2368 if err != nil { 2369 return err 2370 } 2371 return fail 2372 }) 2373 if err != fail { 2374 t.Fatalf("rolling back: got error %v, want the error 'fail'", err) 2375 } 2376 _, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"}) 2377 if got, want := ErrCode(err), codes.NotFound; got != want { 2378 t.Errorf("got %s, want %s", got, want) 2379 } 2380 2381 // Run two DML statements in the same transaction. 2382 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2383 _, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`}) 2384 if err != nil { 2385 return err 2386 } 2387 _, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`}) 2388 if err != nil { 2389 return err 2390 } 2391 return nil 2392 }) 2393 if err != nil { 2394 t.Fatal(err) 2395 } 2396 got := readFirstNames(1, 2) 2397 want := []string{"Oum", "Eddie"} 2398 if !testEqual(got, want) { 2399 t.Errorf("got %v, want %v", got, want) 2400 } 2401 2402 // Run a DML statement and an ordinary mutation in the same transaction. 2403 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2404 _, err := tx.Update(ctx, Statement{ 2405 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 2406 }) 2407 if err != nil { 2408 return err 2409 } 2410 return tx.BufferWrite([]*Mutation{ 2411 Insert("Singers", []string{"SingerId", "FirstName", "LastName"}, 2412 []interface{}{4, "Andy", "Irvine"}), 2413 }) 2414 }) 2415 if err != nil { 2416 t.Fatal(err) 2417 } 2418 got = readFirstNames(3, 4) 2419 want = []string{"Audra", "Andy"} 2420 if !testEqual(got, want) { 2421 t.Errorf("got %v, want %v", got, want) 2422 } 2423 2424 // Attempt to run a query using update. 2425 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2426 _, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`}) 2427 return err 2428 }) 2429 if got, want := ErrCode(err), codes.InvalidArgument; got != want { 2430 t.Errorf("got %s, want %s", got, want) 2431 } 2432} 2433 2434func TestIntegration_StructParametersBind(t *testing.T) { 2435 t.Parallel() 2436 2437 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2438 defer cancel() 2439 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 2440 defer cleanup() 2441 2442 type tRow []interface{} 2443 type tRows []struct{ trow tRow } 2444 2445 type allFields struct { 2446 Stringf string 2447 Intf int 2448 Boolf bool 2449 Floatf float64 2450 Bytef []byte 2451 Timef time.Time 2452 Datef civil.Date 2453 } 2454 allColumns := []string{ 2455 "Stringf", 2456 "Intf", 2457 "Boolf", 2458 "Floatf", 2459 "Bytef", 2460 "Timef", 2461 "Datef", 2462 } 2463 s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1} 2464 s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2} 2465 2466 dynamicStructType := reflect.StructOf([]reflect.StructField{ 2467 {Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`}, 2468 }) 2469 s3 := reflect.New(dynamicStructType) 2470 s3.Elem().Field(0).Set(reflect.ValueOf(t1)) 2471 2472 for i, test := range []struct { 2473 param interface{} 2474 sql string 2475 cols []string 2476 trows tRows 2477 }{ 2478 // Struct value. 2479 { 2480 s1, 2481 "SELECT" + 2482 " @p.Stringf," + 2483 " @p.Intf," + 2484 " @p.Boolf," + 2485 " @p.Floatf," + 2486 " @p.Bytef," + 2487 " @p.Timef," + 2488 " @p.Datef", 2489 allColumns, 2490 tRows{ 2491 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 2492 }, 2493 }, 2494 // Array of struct value. 2495 { 2496 []allFields{s1, s2}, 2497 "SELECT * FROM UNNEST(@p)", 2498 allColumns, 2499 tRows{ 2500 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 2501 {tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}}, 2502 }, 2503 }, 2504 // Null struct. 2505 { 2506 (*allFields)(nil), 2507 "SELECT @p IS NULL", 2508 []string{""}, 2509 tRows{ 2510 {tRow{true}}, 2511 }, 2512 }, 2513 // Null Array of struct. 2514 { 2515 []allFields(nil), 2516 "SELECT @p IS NULL", 2517 []string{""}, 2518 tRows{ 2519 {tRow{true}}, 2520 }, 2521 }, 2522 // Empty struct. 2523 { 2524 struct{}{}, 2525 "SELECT @p IS NULL ", 2526 []string{""}, 2527 tRows{ 2528 {tRow{false}}, 2529 }, 2530 }, 2531 // Empty array of struct. 2532 { 2533 []allFields{}, 2534 "SELECT * FROM UNNEST(@p) ", 2535 allColumns, 2536 tRows{}, 2537 }, 2538 // Struct with duplicate fields. 2539 { 2540 struct { 2541 A int `spanner:"field"` 2542 B int `spanner:"field"` 2543 }{10, 20}, 2544 "SELECT * FROM UNNEST([@p]) ", 2545 []string{"field", "field"}, 2546 tRows{ 2547 {tRow{10, 20}}, 2548 }, 2549 }, 2550 // Struct with unnamed fields. 2551 { 2552 struct { 2553 A string `spanner:""` 2554 }{"hello"}, 2555 "SELECT * FROM UNNEST([@p]) ", 2556 []string{""}, 2557 tRows{ 2558 {tRow{"hello"}}, 2559 }, 2560 }, 2561 // Mixed struct. 2562 { 2563 struct { 2564 DynamicStructField interface{} `spanner:"f1"` 2565 ArrayStructField []*allFields `spanner:"f2"` 2566 }{ 2567 DynamicStructField: s3.Interface(), 2568 ArrayStructField: []*allFields{nil}, 2569 }, 2570 "SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ", 2571 []string{"ff1", "", ""}, 2572 tRows{ 2573 {tRow{t1, 1, true}}, 2574 }, 2575 }, 2576 } { 2577 iter := client.Single().Query(ctx, Statement{ 2578 SQL: test.sql, 2579 Params: map[string]interface{}{"p": test.param}, 2580 }) 2581 var gotRows []*Row 2582 err := iter.Do(func(r *Row) error { 2583 gotRows = append(gotRows, r) 2584 return nil 2585 }) 2586 if err != nil { 2587 t.Errorf("Failed to execute test case %d, error: %v", i, err) 2588 } 2589 2590 var wantRows []*Row 2591 for j, row := range test.trows { 2592 r, err := NewRow(test.cols, row.trow) 2593 if err != nil { 2594 t.Errorf("Invalid row %d in test case %d", j, i) 2595 } 2596 wantRows = append(wantRows, r) 2597 } 2598 if !testEqual(gotRows, wantRows) { 2599 t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows) 2600 } 2601 } 2602} 2603 2604func TestIntegration_PDML(t *testing.T) { 2605 t.Parallel() 2606 2607 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2608 defer cancel() 2609 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2610 defer cleanup() 2611 2612 columns := []string{"SingerId", "FirstName", "LastName"} 2613 2614 // Populate the Singers table. 2615 var muts []*Mutation 2616 for _, row := range [][]interface{}{ 2617 {1, "Umm", "Kulthum"}, 2618 {2, "Eduard", "Khil"}, 2619 {3, "Audra", "McDonald"}, 2620 {4, "Enrique", "Iglesias"}, 2621 {5, "Shakira", "Ripoll"}, 2622 } { 2623 muts = append(muts, Insert("Singers", columns, row)) 2624 } 2625 if _, err := client.Apply(ctx, muts); err != nil { 2626 t.Fatal(err) 2627 } 2628 // Identifiers in PDML statements must be fully qualified. 2629 // TODO(jba): revisit the above. 2630 count, err := client.PartitionedUpdate(ctx, Statement{ 2631 SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= @end`, 2632 Params: map[string]interface{}{ 2633 "end": 3, 2634 }, 2635 }) 2636 if err != nil { 2637 t.Fatal(err) 2638 } 2639 if want := int64(3); count != want { 2640 t.Fatalf("got %d, want %d", count, want) 2641 } 2642 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2643 if err != nil { 2644 t.Fatal(err) 2645 } 2646 want := [][]interface{}{ 2647 {int64(1), "changed", "Kulthum"}, 2648 {int64(2), "changed", "Khil"}, 2649 {int64(3), "changed", "McDonald"}, 2650 {int64(4), "Enrique", "Iglesias"}, 2651 {int64(5), "Shakira", "Ripoll"}, 2652 } 2653 if !testEqual(got, want) { 2654 t.Errorf("\ngot %v\nwant%v", got, want) 2655 } 2656} 2657 2658func TestIntegration_BatchDML(t *testing.T) { 2659 t.Parallel() 2660 2661 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2662 defer cancel() 2663 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2664 defer cleanup() 2665 2666 columns := []string{"SingerId", "FirstName", "LastName"} 2667 2668 // Populate the Singers table. 2669 var muts []*Mutation 2670 for _, row := range [][]interface{}{ 2671 {1, "Umm", "Kulthum"}, 2672 {2, "Eduard", "Khil"}, 2673 {3, "Audra", "McDonald"}, 2674 } { 2675 muts = append(muts, Insert("Singers", columns, row)) 2676 } 2677 if _, err := client.Apply(ctx, muts); err != nil { 2678 t.Fatal(err) 2679 } 2680 2681 var counts []int64 2682 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2683 counts, err = tx.BatchUpdate(ctx, []Statement{ 2684 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2685 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2686 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2687 }) 2688 return err 2689 }) 2690 2691 if err != nil { 2692 t.Fatal(err) 2693 } 2694 if want := []int64{1, 1, 1}; !testEqual(counts, want) { 2695 t.Fatalf("got %d, want %d", counts, want) 2696 } 2697 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2698 if err != nil { 2699 t.Fatal(err) 2700 } 2701 want := [][]interface{}{ 2702 {int64(1), "changed 1", "Kulthum"}, 2703 {int64(2), "changed 2", "Khil"}, 2704 {int64(3), "changed 3", "McDonald"}, 2705 } 2706 if !testEqual(got, want) { 2707 t.Errorf("\ngot %v\nwant%v", got, want) 2708 } 2709} 2710 2711func TestIntegration_BatchDML_NoStatements(t *testing.T) { 2712 t.Parallel() 2713 2714 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2715 defer cancel() 2716 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2717 defer cleanup() 2718 2719 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2720 _, err = tx.BatchUpdate(ctx, []Statement{}) 2721 return err 2722 }) 2723 if err == nil { 2724 t.Fatal("expected error, got nil") 2725 } 2726 if s, ok := status.FromError(err); ok { 2727 if s.Code() != codes.InvalidArgument { 2728 t.Fatalf("expected InvalidArgument, got %v", err) 2729 } 2730 } else { 2731 t.Fatalf("expected InvalidArgument, got %v", err) 2732 } 2733} 2734 2735func TestIntegration_BatchDML_TwoStatements(t *testing.T) { 2736 t.Parallel() 2737 2738 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2739 defer cancel() 2740 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2741 defer cleanup() 2742 2743 columns := []string{"SingerId", "FirstName", "LastName"} 2744 2745 // Populate the Singers table. 2746 var muts []*Mutation 2747 for _, row := range [][]interface{}{ 2748 {1, "Umm", "Kulthum"}, 2749 {2, "Eduard", "Khil"}, 2750 {3, "Audra", "McDonald"}, 2751 } { 2752 muts = append(muts, Insert("Singers", columns, row)) 2753 } 2754 if _, err := client.Apply(ctx, muts); err != nil { 2755 t.Fatal(err) 2756 } 2757 2758 var updateCount int64 2759 var batchCounts []int64 2760 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2761 batchCounts, err = tx.BatchUpdate(ctx, []Statement{ 2762 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2763 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2764 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2765 }) 2766 if err != nil { 2767 return err 2768 } 2769 2770 updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}) 2771 return err 2772 }) 2773 if err != nil { 2774 t.Fatal(err) 2775 } 2776 if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) { 2777 t.Fatalf("got %d, want %d", batchCounts, want) 2778 } 2779 if updateCount != 1 { 2780 t.Fatalf("got %v, want 1", updateCount) 2781 } 2782} 2783 2784func TestIntegration_BatchDML_Error(t *testing.T) { 2785 t.Parallel() 2786 2787 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2788 defer cancel() 2789 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2790 defer cleanup() 2791 2792 columns := []string{"SingerId", "FirstName", "LastName"} 2793 2794 // Populate the Singers table. 2795 var muts []*Mutation 2796 for _, row := range [][]interface{}{ 2797 {1, "Umm", "Kulthum"}, 2798 {2, "Eduard", "Khil"}, 2799 {3, "Audra", "McDonald"}, 2800 } { 2801 muts = append(muts, Insert("Singers", columns, row)) 2802 } 2803 if _, err := client.Apply(ctx, muts); err != nil { 2804 t.Fatal(err) 2805 } 2806 2807 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2808 counts, err := tx.BatchUpdate(ctx, []Statement{ 2809 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2810 {SQL: `some illegal statement`}, 2811 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2812 }) 2813 if err == nil { 2814 t.Fatal("expected err, got nil") 2815 } 2816 // The statement may also have been aborted by Spanner, and in that 2817 // case we should just let the client library retry the transaction. 2818 if status.Code(err) == codes.Aborted { 2819 return err 2820 } 2821 if want := []int64{1}; !testEqual(counts, want) { 2822 t.Fatalf("got %d, want %d", counts, want) 2823 } 2824 2825 got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns)) 2826 // Aborted error is ok, just retry the transaction. 2827 if status.Code(err) == codes.Aborted { 2828 return err 2829 } 2830 if err != nil { 2831 t.Fatal(err) 2832 } 2833 want := [][]interface{}{ 2834 {int64(1), "changed 1", "Kulthum"}, 2835 {int64(2), "Eduard", "Khil"}, 2836 {int64(3), "Audra", "McDonald"}, 2837 } 2838 if !testEqual(got, want) { 2839 t.Errorf("\ngot %v\nwant%v", got, want) 2840 } 2841 2842 return nil 2843 }) 2844 if err != nil { 2845 t.Fatal(err) 2846 } 2847} 2848 2849func TestIntegration_StartBackupOperation(t *testing.T) { 2850 skipEmulatorTest(t) 2851 t.Parallel() 2852 2853 // Backups can be slow, so use a 30 minute timeout. 2854 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) 2855 defer cancel() 2856 _, testDatabaseName, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, backuDBStatements) 2857 defer cleanup() 2858 2859 backupID := backupIDSpace.New() 2860 backupName := fmt.Sprintf("projects/%s/instances/%s/backups/%s", testProjectID, testInstanceID, backupID) 2861 // Minimum expiry time is 6 hours 2862 expires := time.Now().Add(time.Hour * 7) 2863 respLRO, err := databaseAdmin.StartBackupOperation(ctx, backupID, testDatabaseName, expires) 2864 if err != nil { 2865 t.Fatal(err) 2866 } 2867 2868 _, err = respLRO.Wait(ctx) 2869 if err != nil { 2870 t.Fatal(err) 2871 } 2872 respMetadata, err := respLRO.Metadata() 2873 if err != nil { 2874 t.Fatalf("backup response metadata, got error %v, want nil", err) 2875 } 2876 if respMetadata.Database != testDatabaseName { 2877 t.Fatalf("backup database name, got %s, want %s", respMetadata.Database, testDatabaseName) 2878 } 2879 if respMetadata.Progress.ProgressPercent != 100 { 2880 t.Fatalf("backup progress percent, got %d, want 100", respMetadata.Progress.ProgressPercent) 2881 } 2882 respCheck, err := databaseAdmin.GetBackup(ctx, &adminpb.GetBackupRequest{Name: backupName}) 2883 if err != nil { 2884 t.Fatalf("backup metadata, got error %v, want nil", err) 2885 } 2886 if respCheck.CreateTime == nil { 2887 t.Fatal("backup create time, got nil, want non-nil") 2888 } 2889 if respCheck.State != adminpb.Backup_READY { 2890 t.Fatalf("backup state, got %v, want %v", respCheck.State, adminpb.Backup_READY) 2891 } 2892 if respCheck.SizeBytes == 0 { 2893 t.Fatalf("backup size, got %d, want non-zero", respCheck.SizeBytes) 2894 } 2895} 2896 2897// Prepare initializes Cloud Spanner testing DB and clients. 2898func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) { 2899 if databaseAdmin == nil { 2900 t.Skip("Integration tests skipped") 2901 } 2902 // Construct a unique test DB name. 2903 dbName := dbNameSpace.New() 2904 2905 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName) 2906 // Create database and tables. 2907 op, err := databaseAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{ 2908 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 2909 CreateStatement: "CREATE DATABASE " + dbName, 2910 ExtraStatements: statements, 2911 }) 2912 if err != nil { 2913 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 2914 } 2915 if _, err := op.Wait(ctx); err != nil { 2916 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 2917 } 2918 client, err := createClient(ctx, dbPath, spc) 2919 if err != nil { 2920 t.Fatalf("cannot create data client on DB %v: %v", dbPath, err) 2921 } 2922 return client, dbPath, func() { 2923 client.Close() 2924 } 2925} 2926 2927func cleanupInstances() { 2928 if instanceAdmin == nil { 2929 // Integration tests skipped. 2930 return 2931 } 2932 2933 ctx := context.Background() 2934 parent := fmt.Sprintf("projects/%v", testProjectID) 2935 iter := instanceAdmin.ListInstances(ctx, &instancepb.ListInstancesRequest{ 2936 Parent: parent, 2937 Filter: "name:gotest-", 2938 }) 2939 expireAge := 24 * time.Hour 2940 2941 for { 2942 inst, err := iter.Next() 2943 if err == iterator.Done { 2944 break 2945 } 2946 if err != nil { 2947 panic(err) 2948 } 2949 2950 _, instID, err := parseInstanceName(inst.Name) 2951 if err != nil { 2952 log.Printf("Error: %v\n", err) 2953 continue 2954 } 2955 if instanceNameSpace.Older(instID, expireAge) { 2956 log.Printf("Deleting instance %s", inst.Name) 2957 deleteInstanceAndBackups(ctx, inst.Name) 2958 } 2959 } 2960} 2961 2962func deleteInstanceAndBackups(ctx context.Context, instanceName string) { 2963 // First delete any lingering backups that might have been left on 2964 // the instance. 2965 backups := databaseAdmin.ListBackups(ctx, &adminpb.ListBackupsRequest{Parent: instanceName}) 2966 for { 2967 backup, err := backups.Next() 2968 if err == iterator.Done { 2969 break 2970 } 2971 if err != nil { 2972 log.Printf("failed to retrieve backups from instance %s because of error %v", instanceName, err) 2973 break 2974 } 2975 if err := databaseAdmin.DeleteBackup(ctx, &adminpb.DeleteBackupRequest{Name: backup.Name}); err != nil { 2976 log.Printf("failed to delete backup %s (error %v)", backup.Name, err) 2977 } 2978 } 2979 2980 if err := instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{Name: instanceName}); err != nil { 2981 log.Printf("failed to delete instance %s (error %v), might need a manual removal", 2982 instanceName, err) 2983 } 2984} 2985 2986func rangeReads(ctx context.Context, t *testing.T, client *Client) { 2987 checkRange := func(ks KeySet, wantNums ...int) { 2988 if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok { 2989 t.Errorf("key set %+v: %s", ks, msg) 2990 } 2991 } 2992 2993 checkRange(Key{"k1"}, 1) 2994 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4) 2995 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5) 2996 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5) 2997 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4) 2998 2999 // Partial key specification. 3000 checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9) 3001 checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9) 3002 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10) 3003 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11) 3004 3005 // The following produce empty ranges. 3006 // TODO(jba): Consider a multi-part key to illustrate partial key behavior. 3007 // checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen}) 3008 // checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen}) 3009 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen}) 3010 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed}) 3011 3012 // Prefix is component-wise, not string prefix. 3013 checkRange(Key{"k1"}.AsPrefix(), 1) 3014 checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 3015 3016 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 3017} 3018 3019func indexRangeReads(ctx context.Context, t *testing.T, client *Client) { 3020 checkRange := func(ks KeySet, wantNums ...int) { 3021 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns), 3022 wantNums); !ok { 3023 t.Errorf("key set %+v: %s", ks, msg) 3024 } 3025 } 3026 3027 checkRange(Key{"v1"}, 1) 3028 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4) 3029 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5) 3030 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5) 3031 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4) 3032 3033 // // Partial key specification. 3034 checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9) 3035 checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9) 3036 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10) 3037 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11) 3038 3039 // // The following produce empty ranges. 3040 // checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen}) 3041 // checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen}) 3042 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen}) 3043 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed}) 3044 3045 // // Prefix is component-wise, not string prefix. 3046 checkRange(Key{"v1"}.AsPrefix(), 1) 3047 checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 3048 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 3049 3050 // Read from an index with DESC ordering. 3051 wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0} 3052 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns), 3053 wantNums); !ok { 3054 t.Errorf("desc: %s", msg) 3055 } 3056} 3057 3058type testTableRow struct{ Key, StringValue string } 3059 3060func compareRows(iter *RowIterator, wantNums []int) (string, bool) { 3061 rows, err := readAllTestTable(iter) 3062 if err != nil { 3063 return err.Error(), false 3064 } 3065 want := map[string]string{} 3066 for _, n := range wantNums { 3067 want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n) 3068 } 3069 got := map[string]string{} 3070 for _, r := range rows { 3071 got[r.Key] = r.StringValue 3072 } 3073 if !testEqual(got, want) { 3074 return fmt.Sprintf("got %v, want %v", got, want), false 3075 } 3076 return "", true 3077} 3078 3079func isNaN(x interface{}) bool { 3080 f, ok := x.(float64) 3081 if !ok { 3082 return false 3083 } 3084 return math.IsNaN(f) 3085} 3086 3087// createClient creates Cloud Spanner data client. 3088func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) { 3089 client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}) 3090 if err != nil { 3091 return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err) 3092 } 3093 return client, nil 3094} 3095 3096// populate prepares the database with some data. 3097func populate(ctx context.Context, client *Client) error { 3098 // Populate data 3099 var err error 3100 m := InsertMap("test", map[string]interface{}{ 3101 "a": str1, 3102 "b": str2, 3103 }) 3104 _, err = client.Apply(ctx, []*Mutation{m}) 3105 return err 3106} 3107 3108func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) { 3109 if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) { 3110 return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false 3111 } 3112 return "", true 3113} 3114 3115func rowToValues(r *Row) ([]interface{}, error) { 3116 var x int64 3117 var y, z string 3118 if err := r.Column(0, &x); err != nil { 3119 return nil, err 3120 } 3121 if err := r.Column(1, &y); err != nil { 3122 return nil, err 3123 } 3124 if err := r.Column(2, &z); err != nil { 3125 return nil, err 3126 } 3127 return []interface{}{x, y, z}, nil 3128} 3129 3130func readAll(iter *RowIterator) ([][]interface{}, error) { 3131 defer iter.Stop() 3132 var vals [][]interface{} 3133 for { 3134 row, err := iter.Next() 3135 if err == iterator.Done { 3136 return vals, nil 3137 } 3138 if err != nil { 3139 return nil, err 3140 } 3141 v, err := rowToValues(row) 3142 if err != nil { 3143 return nil, err 3144 } 3145 vals = append(vals, v) 3146 } 3147} 3148 3149func readAllTestTable(iter *RowIterator) ([]testTableRow, error) { 3150 defer iter.Stop() 3151 var vals []testTableRow 3152 for { 3153 row, err := iter.Next() 3154 if err == iterator.Done { 3155 return vals, nil 3156 } 3157 if err != nil { 3158 return nil, err 3159 } 3160 var ttr testTableRow 3161 if err := row.ToStruct(&ttr); err != nil { 3162 return nil, err 3163 } 3164 vals = append(vals, ttr) 3165 } 3166} 3167 3168func readAllAccountsTable(iter *RowIterator) ([][]interface{}, error) { 3169 defer iter.Stop() 3170 var vals [][]interface{} 3171 for { 3172 row, err := iter.Next() 3173 if err == iterator.Done { 3174 return vals, nil 3175 } 3176 if err != nil { 3177 return nil, err 3178 } 3179 var id, balance int64 3180 var nickname string 3181 err = row.Columns(&id, &nickname, &balance) 3182 if err != nil { 3183 return nil, err 3184 } 3185 vals = append(vals, []interface{}{id, nickname, balance}) 3186 } 3187} 3188 3189func maxDuration(a, b time.Duration) time.Duration { 3190 if a > b { 3191 return a 3192 } 3193 return b 3194} 3195 3196func isEmulatorEnvSet() bool { 3197 return os.Getenv("SPANNER_EMULATOR_HOST") != "" 3198} 3199 3200func skipEmulatorTest(t *testing.T) { 3201 if isEmulatorEnvSet() { 3202 t.Skip("Skipping testing against the emulator.") 3203 } 3204} 3205