1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "errors" 22 "flag" 23 "fmt" 24 "log" 25 "math" 26 "math/big" 27 "os" 28 "os/exec" 29 "reflect" 30 "regexp" 31 "strings" 32 "sync" 33 "testing" 34 "time" 35 36 "cloud.google.com/go/civil" 37 "cloud.google.com/go/internal/testutil" 38 "cloud.google.com/go/internal/uid" 39 database "cloud.google.com/go/spanner/admin/database/apiv1" 40 instance "cloud.google.com/go/spanner/admin/instance/apiv1" 41 "google.golang.org/api/iterator" 42 "google.golang.org/api/option" 43 adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" 44 instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1" 45 sppb "google.golang.org/genproto/googleapis/spanner/v1" 46 "google.golang.org/grpc" 47 "google.golang.org/grpc/codes" 48 "google.golang.org/grpc/peer" 49 "google.golang.org/grpc/status" 50) 51 52const ( 53 directPathIPV6Prefix = "[2001:4860:8040" 54 directPathIPV4Prefix = "34.126" 55) 56 57var ( 58 // testProjectID specifies the project used for testing. It can be changed 59 // by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID. 60 testProjectID = testutil.ProjID() 61 62 // spannerHost specifies the spanner API host used for testing. It can be changed 63 // by setting the environment variable GCLOUD_TESTS_GOLANG_SPANNER_HOST 64 spannerHost = getSpannerHost() 65 66 dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) 67 instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true}) 68 backupIDSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) 69 testInstanceID = instanceNameSpace.New() 70 71 testTable = "TestTable" 72 testTableIndex = "TestTableByValue" 73 testTableColumns = []string{"Key", "StringValue"} 74 75 databaseAdmin *database.DatabaseAdminClient 76 instanceAdmin *instance.InstanceAdminClient 77 78 dpConfig directPathTestConfig 79 peerInfo *peer.Peer 80 81 singerDBStatements = []string{ 82 `CREATE TABLE Singers ( 83 SingerId INT64 NOT NULL, 84 FirstName STRING(1024), 85 LastName STRING(1024), 86 SingerInfo BYTES(MAX) 87 ) PRIMARY KEY (SingerId)`, 88 `CREATE INDEX SingerByName ON Singers(FirstName, LastName)`, 89 `CREATE TABLE Accounts ( 90 AccountId INT64 NOT NULL, 91 Nickname STRING(100), 92 Balance INT64 NOT NULL, 93 ) PRIMARY KEY (AccountId)`, 94 `CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`, 95 `CREATE TABLE Types ( 96 RowID INT64 NOT NULL, 97 String STRING(MAX), 98 StringArray ARRAY<STRING(MAX)>, 99 Bytes BYTES(MAX), 100 BytesArray ARRAY<BYTES(MAX)>, 101 Int64a INT64, 102 Int64Array ARRAY<INT64>, 103 Bool BOOL, 104 BoolArray ARRAY<BOOL>, 105 Float64 FLOAT64, 106 Float64Array ARRAY<FLOAT64>, 107 Date DATE, 108 DateArray ARRAY<DATE>, 109 Timestamp TIMESTAMP, 110 TimestampArray ARRAY<TIMESTAMP>, 111 ) PRIMARY KEY (RowID)`, 112 } 113 114 readDBStatements = []string{ 115 `CREATE TABLE TestTable ( 116 Key STRING(MAX) NOT NULL, 117 StringValue STRING(MAX) 118 ) PRIMARY KEY (Key)`, 119 `CREATE INDEX TestTableByValue ON TestTable(StringValue)`, 120 `CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`, 121 } 122 123 simpleDBStatements = []string{ 124 `CREATE TABLE test ( 125 a STRING(1024), 126 b STRING(1024), 127 ) PRIMARY KEY (a)`, 128 } 129 simpleDBTableColumns = []string{"a", "b"} 130 131 ctsDBStatements = []string{ 132 `CREATE TABLE TestTable ( 133 Key STRING(MAX) NOT NULL, 134 Ts TIMESTAMP OPTIONS (allow_commit_timestamp = true), 135 ) PRIMARY KEY (Key)`, 136 } 137 backuDBStatements = []string{ 138 `CREATE TABLE Singers ( 139 SingerId INT64 NOT NULL, 140 FirstName STRING(1024), 141 LastName STRING(1024), 142 SingerInfo BYTES(MAX) 143 ) PRIMARY KEY (SingerId)`, 144 `CREATE INDEX SingerByName ON Singers(FirstName, LastName)`, 145 `CREATE TABLE Accounts ( 146 AccountId INT64 NOT NULL, 147 Nickname STRING(100), 148 Balance INT64 NOT NULL, 149 ) PRIMARY KEY (AccountId)`, 150 `CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`, 151 `CREATE TABLE Types ( 152 RowID INT64 NOT NULL, 153 String STRING(MAX), 154 StringArray ARRAY<STRING(MAX)>, 155 Bytes BYTES(MAX), 156 BytesArray ARRAY<BYTES(MAX)>, 157 Int64a INT64, 158 Int64Array ARRAY<INT64>, 159 Bool BOOL, 160 BoolArray ARRAY<BOOL>, 161 Float64 FLOAT64, 162 Float64Array ARRAY<FLOAT64>, 163 Date DATE, 164 DateArray ARRAY<DATE>, 165 Timestamp TIMESTAMP, 166 TimestampArray ARRAY<TIMESTAMP>, 167 ) PRIMARY KEY (RowID)`, 168 } 169 170 validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)$") 171 172 blackholeDpv6Cmd string 173 blackholeDpv4Cmd string 174 allowDpv6Cmd string 175 allowDpv4Cmd string 176) 177 178func init() { 179 flag.BoolVar(&dpConfig.attemptDirectPath, "it.attempt-directpath", false, "DirectPath integration test flag") 180 flag.BoolVar(&dpConfig.directPathIPv4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a IPv4-only VM") 181 peerInfo = &peer.Peer{} 182 // Use sysctl or iptables to blackhole DirectPath IP for fallback tests. 183 flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6") 184 flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4") 185 flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6") 186 flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4") 187} 188 189type directPathTestConfig struct { 190 attemptDirectPath bool 191 directPathIPv4Only bool 192} 193 194func parseInstanceName(inst string) (project, instance string, err error) { 195 matches := validInstancePattern.FindStringSubmatch(inst) 196 if len(matches) == 0 { 197 return "", "", fmt.Errorf("Failed to parse instance name from %q according to pattern %q", 198 inst, validInstancePattern.String()) 199 } 200 return matches[1], matches[2], nil 201} 202 203func getSpannerHost() string { 204 return os.Getenv("GCLOUD_TESTS_GOLANG_SPANNER_HOST") 205} 206 207const ( 208 str1 = "alice" 209 str2 = "a@example.com" 210) 211 212func TestMain(m *testing.M) { 213 cleanup := initIntegrationTests() 214 res := m.Run() 215 cleanup() 216 os.Exit(res) 217} 218 219var grpcHeaderChecker = testutil.DefaultHeadersEnforcer() 220 221func initIntegrationTests() (cleanup func()) { 222 ctx := context.Background() 223 flag.Parse() // Needed for testing.Short(). 224 noop := func() {} 225 226 if testing.Short() { 227 log.Println("Integration tests skipped in -short mode.") 228 return noop 229 } 230 231 if testProjectID == "" { 232 log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing") 233 return noop 234 } 235 236 opts := grpcHeaderChecker.CallOptions() 237 if spannerHost != "" { 238 opts = append(opts, option.WithEndpoint(spannerHost)) 239 } 240 if dpConfig.attemptDirectPath { 241 opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo)))) 242 } 243 var err error 244 // Create InstanceAdmin and DatabaseAdmin clients. 245 instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...) 246 if err != nil { 247 log.Fatalf("cannot create instance databaseAdmin client: %v", err) 248 } 249 databaseAdmin, err = database.NewDatabaseAdminClient(ctx, opts...) 250 if err != nil { 251 log.Fatalf("cannot create databaseAdmin client: %v", err) 252 } 253 // Get the list of supported instance configs for the project that is used 254 // for the integration tests. The supported instance configs can differ per 255 // project. The integration tests will use the first instance config that 256 // is returned by Cloud Spanner. This will normally be the regional config 257 // that is physically the closest to where the request is coming from. 258 configIterator := instanceAdmin.ListInstanceConfigs(ctx, &instancepb.ListInstanceConfigsRequest{ 259 Parent: fmt.Sprintf("projects/%s", testProjectID), 260 }) 261 config, err := configIterator.Next() 262 if err != nil { 263 log.Fatalf("Cannot get any instance configurations.\nPlease make sure the Cloud Spanner API is enabled for the test project.\nGet error: %v", err) 264 } 265 266 // First clean up any old test instances before we start the actual testing 267 // as these might cause this test run to fail. 268 cleanupInstances() 269 270 // Create a test instance to use for this test run. 271 op, err := instanceAdmin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{ 272 Parent: fmt.Sprintf("projects/%s", testProjectID), 273 InstanceId: testInstanceID, 274 Instance: &instancepb.Instance{ 275 Config: config.Name, 276 DisplayName: testInstanceID, 277 NodeCount: 1, 278 }, 279 }) 280 if err != nil { 281 log.Fatalf("could not create instance with id %s: %v", fmt.Sprintf("projects/%s/instances/%s", testProjectID, testInstanceID), err) 282 } 283 // Wait for the instance creation to finish. 284 i, err := op.Wait(ctx) 285 if err != nil { 286 log.Fatalf("waiting for instance creation to finish failed: %v", err) 287 } 288 if i.State != instancepb.Instance_READY { 289 log.Printf("instance state is not READY, it might be that the test instance will cause problems during tests. Got state %v\n", i.State) 290 } 291 292 return func() { 293 // Delete this test instance. 294 instanceName := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID) 295 deleteInstanceAndBackups(ctx, instanceName) 296 // Delete other test instances that may be lingering around. 297 cleanupInstances() 298 databaseAdmin.Close() 299 instanceAdmin.Close() 300 } 301} 302 303func TestIntegration_InitSessionPool(t *testing.T) { 304 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 305 defer cancel() 306 // Set up an empty testing environment. 307 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{}) 308 defer cleanup() 309 sp := client.idleSessions 310 sp.mu.Lock() 311 want := sp.MinOpened 312 sp.mu.Unlock() 313 var numOpened int 314loop: 315 for { 316 select { 317 case <-ctx.Done(): 318 t.Fatalf("timed out, got %d session(s), want %d", numOpened, want) 319 default: 320 sp.mu.Lock() 321 numOpened = sp.idleList.Len() + sp.idleWriteList.Len() 322 sp.mu.Unlock() 323 if uint64(numOpened) == want { 324 break loop 325 } 326 } 327 } 328 // Delete all sessions in the pool on the backend and then try to execute a 329 // simple query. The 'Session not found' error should cause an automatic 330 // retry of the read-only transaction. 331 sp.mu.Lock() 332 s := sp.idleList.Front() 333 for { 334 if s == nil { 335 break 336 } 337 // This will delete the session on the backend without removing it 338 // from the pool. 339 s.Value.(*session).delete(context.Background()) 340 s = s.Next() 341 } 342 sp.mu.Unlock() 343 sql := "SELECT 1, 'FOO', 'BAR'" 344 tx := client.ReadOnlyTransaction() 345 defer tx.Close() 346 iter := tx.Query(context.Background(), NewStatement(sql)) 347 rows, err := readAll(iter) 348 if err != nil { 349 t.Fatalf("Unexpected error for query %q: %v", sql, err) 350 } 351 if got, want := len(rows), 1; got != want { 352 t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want) 353 } 354 if got, want := len(rows[0]), 3; got != want { 355 t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want) 356 } 357 if got, want := rows[0][0].(int64), int64(1); got != want { 358 t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want) 359 } 360} 361 362// Test SingleUse transaction. 363func TestIntegration_SingleUse(t *testing.T) { 364 t.Parallel() 365 366 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 367 defer cancel() 368 // Set up testing environment. 369 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 370 defer cleanup() 371 372 writes := []struct { 373 row []interface{} 374 ts time.Time 375 }{ 376 {row: []interface{}{1, "Marc", "Foo"}}, 377 {row: []interface{}{2, "Tars", "Bar"}}, 378 {row: []interface{}{3, "Alpha", "Beta"}}, 379 {row: []interface{}{4, "Last", "End"}}, 380 } 381 // Try to write four rows through the Apply API. 382 for i, w := range writes { 383 var err error 384 m := InsertOrUpdate("Singers", 385 []string{"SingerId", "FirstName", "LastName"}, 386 w.row) 387 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 388 t.Fatal(err) 389 } 390 verifyDirectPathRemoteAddress(t) 391 } 392 // Calculate time difference between Cloud Spanner server and localhost to 393 // use to determine the exact staleness value to use. 394 timeDiff := maxDuration(time.Now().Sub(writes[0].ts), 0) 395 396 // Test reading rows with different timestamp bounds. 397 for i, test := range []struct { 398 name string 399 want [][]interface{} 400 tb TimestampBound 401 checkTs func(time.Time) error 402 }{ 403 { 404 name: "strong", 405 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 406 tb: StrongRead(), 407 checkTs: func(ts time.Time) error { 408 // writes[3] is the last write, all subsequent strong read 409 // should have a timestamp larger than that. 410 if ts.Before(writes[3].ts) { 411 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 412 } 413 return nil 414 }, 415 }, 416 { 417 name: "min_read_timestamp", 418 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 419 tb: MinReadTimestamp(writes[3].ts), 420 checkTs: func(ts time.Time) error { 421 if ts.Before(writes[3].ts) { 422 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 423 } 424 return nil 425 }, 426 }, 427 { 428 name: "max_staleness", 429 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 430 tb: MaxStaleness(time.Second), 431 checkTs: func(ts time.Time) error { 432 if ts.Before(writes[3].ts) { 433 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts) 434 } 435 return nil 436 }, 437 }, 438 { 439 name: "read_timestamp", 440 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 441 tb: ReadTimestamp(writes[2].ts), 442 checkTs: func(ts time.Time) error { 443 if ts != writes[2].ts { 444 return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts) 445 } 446 return nil 447 }, 448 }, 449 { 450 name: "exact_staleness", 451 want: nil, 452 // Specify a staleness which should be already before this test. 453 tb: ExactStaleness(time.Now().Sub(writes[0].ts) + timeDiff + 30*time.Second), 454 checkTs: func(ts time.Time) error { 455 if !ts.Before(writes[0].ts) { 456 return fmt.Errorf("read got timestamp %v, want it to be earlier than %v", ts, writes[0].ts) 457 } 458 return nil 459 }, 460 }, 461 } { 462 t.Run(test.name, func(t *testing.T) { 463 // SingleUse.Query 464 su := client.Single().WithTimestampBound(test.tb) 465 got, err := readAll(su.Query( 466 ctx, 467 Statement{ 468 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId", 469 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 470 })) 471 if err != nil { 472 t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err) 473 } 474 verifyDirectPathRemoteAddress(t) 475 rts, err := su.Timestamp() 476 if err != nil { 477 t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err) 478 } 479 if err := test.checkTs(rts); err != nil { 480 t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err) 481 } 482 if !testEqual(got, test.want) { 483 t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want) 484 } 485 // SingleUse.Read 486 su = client.Single().WithTimestampBound(test.tb) 487 got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 488 if err != nil { 489 t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err) 490 } 491 verifyDirectPathRemoteAddress(t) 492 rts, err = su.Timestamp() 493 if err != nil { 494 t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err) 495 } 496 if err := test.checkTs(rts); err != nil { 497 t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err) 498 } 499 if !testEqual(got, test.want) { 500 t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want) 501 } 502 // SingleUse.ReadRow 503 got = nil 504 for _, k := range []Key{{1}, {3}, {4}} { 505 su = client.Single().WithTimestampBound(test.tb) 506 r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 507 if err != nil { 508 continue 509 } 510 verifyDirectPathRemoteAddress(t) 511 v, err := rowToValues(r) 512 if err != nil { 513 continue 514 } 515 got = append(got, v) 516 rts, err = su.Timestamp() 517 if err != nil { 518 t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 519 } 520 if err := test.checkTs(rts); err != nil { 521 t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 522 } 523 } 524 if !testEqual(got, test.want) { 525 t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want) 526 } 527 // SingleUse.ReadUsingIndex 528 su = client.Single().WithTimestampBound(test.tb) 529 got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 530 if err != nil { 531 t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err) 532 } 533 verifyDirectPathRemoteAddress(t) 534 // The results from ReadUsingIndex is sorted by the index rather than primary key. 535 if len(got) != len(test.want) { 536 t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 537 } 538 for j, g := range got { 539 if j > 0 { 540 prev := got[j-1][1].(string) + got[j-1][2].(string) 541 curr := got[j][1].(string) + got[j][2].(string) 542 if strings.Compare(prev, curr) > 0 { 543 t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 544 } 545 } 546 found := false 547 for _, w := range test.want { 548 if testEqual(g, w) { 549 found = true 550 } 551 } 552 if !found { 553 t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 554 break 555 } 556 } 557 rts, err = su.Timestamp() 558 if err != nil { 559 t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 560 } 561 if err := test.checkTs(rts); err != nil { 562 t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 563 } 564 // SingleUse.ReadRowUsingIndex 565 got = nil 566 for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} { 567 su = client.Single().WithTimestampBound(test.tb) 568 r, err := su.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"}) 569 if err != nil { 570 continue 571 } 572 verifyDirectPathRemoteAddress(t) 573 v, err := rowToValues(r) 574 if err != nil { 575 continue 576 } 577 got = append(got, v) 578 rts, err = su.Timestamp() 579 if err != nil { 580 t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err) 581 } 582 if err := test.checkTs(rts); err != nil { 583 t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err) 584 } 585 } 586 if !testEqual(got, test.want) { 587 t.Fatalf("%d: got unexpected results from SingleUse.ReadRowUsingIndex: %v, want %v", i, got, test.want) 588 } 589 }) 590 } 591} 592 593// Test custom query options provided on query-level configuration. 594func TestIntegration_SingleUse_WithQueryOptions(t *testing.T) { 595 skipEmulatorTest(t) 596 t.Parallel() 597 598 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 599 defer cancel() 600 // Set up testing environment. 601 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 602 defer cleanup() 603 604 writes := []struct { 605 row []interface{} 606 ts time.Time 607 }{ 608 {row: []interface{}{1, "Marc", "Foo"}}, 609 {row: []interface{}{2, "Tars", "Bar"}}, 610 {row: []interface{}{3, "Alpha", "Beta"}}, 611 {row: []interface{}{4, "Last", "End"}}, 612 } 613 // Try to write four rows through the Apply API. 614 for i, w := range writes { 615 var err error 616 m := InsertOrUpdate("Singers", 617 []string{"SingerId", "FirstName", "LastName"}, 618 w.row) 619 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 620 t.Fatal(err) 621 } 622 } 623 qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}} 624 got, err := readAll(client.Single().QueryWithOptions(ctx, Statement{ 625 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)", 626 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 627 }, qo)) 628 629 if err != nil { 630 t.Errorf("ReadOnlyTransaction.QueryWithOptions returns error %v, want nil", err) 631 } 632 633 want := [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}} 634 if !testEqual(got, want) { 635 t.Errorf("got unexpected result from ReadOnlyTransaction.QueryWithOptions: %v, want %v", got, want) 636 } 637} 638 639func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) { 640 t.Parallel() 641 642 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 643 defer cancel() 644 // Set up testing environment. 645 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 646 defer cleanup() 647 648 writes := []struct { 649 row []interface{} 650 ts time.Time 651 }{ 652 {row: []interface{}{1, "Marc", "Foo"}}, 653 {row: []interface{}{2, "Tars", "Bar"}}, 654 {row: []interface{}{3, "Alpha", "Beta"}}, 655 {row: []interface{}{4, "Last", "End"}}, 656 } 657 // Try to write four rows through the Apply API. 658 for i, w := range writes { 659 var err error 660 m := InsertOrUpdate("Singers", 661 []string{"SingerId", "FirstName", "LastName"}, 662 w.row) 663 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 664 t.Fatal(err) 665 } 666 } 667 668 su := client.Single() 669 const limit = 1 670 gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), 671 []string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit})) 672 if err != nil { 673 t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err) 674 } 675 if got, want := len(gotRows), limit; got != want { 676 t.Errorf("got %d, want %d", got, want) 677 } 678} 679 680// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it 681// also tests for a single timestamp across multiple reads. 682func TestIntegration_ReadOnlyTransaction(t *testing.T) { 683 t.Parallel() 684 685 ctxTimeout := 5 * time.Minute 686 ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) 687 defer cancel() 688 // Set up testing environment. 689 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 690 defer cleanup() 691 692 writes := []struct { 693 row []interface{} 694 ts time.Time 695 }{ 696 {row: []interface{}{1, "Marc", "Foo"}}, 697 {row: []interface{}{2, "Tars", "Bar"}}, 698 {row: []interface{}{3, "Alpha", "Beta"}}, 699 {row: []interface{}{4, "Last", "End"}}, 700 } 701 // Try to write four rows through the Apply API. 702 for i, w := range writes { 703 var err error 704 m := InsertOrUpdate("Singers", 705 []string{"SingerId", "FirstName", "LastName"}, 706 w.row) 707 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 708 t.Fatal(err) 709 } 710 } 711 712 // For testing timestamp bound staleness. 713 <-time.After(time.Second) 714 715 // Test reading rows with different timestamp bounds. 716 for i, test := range []struct { 717 want [][]interface{} 718 tb TimestampBound 719 checkTs func(time.Time) error 720 }{ 721 // Note: min_read_timestamp and max_staleness are not supported by 722 // ReadOnlyTransaction. See API document for more details. 723 { 724 // strong 725 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 726 StrongRead(), 727 func(ts time.Time) error { 728 if ts.Before(writes[3].ts) { 729 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 730 } 731 return nil 732 }, 733 }, 734 { 735 // read_timestamp 736 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 737 ReadTimestamp(writes[2].ts), 738 func(ts time.Time) error { 739 if ts != writes[2].ts { 740 return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts) 741 } 742 return nil 743 }, 744 }, 745 { 746 // exact_staleness 747 nil, 748 // Specify a staleness which should be already before this test. 749 ExactStaleness(ctxTimeout + 1), 750 func(ts time.Time) error { 751 if ts.After(writes[0].ts) { 752 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts) 753 } 754 return nil 755 }, 756 }, 757 } { 758 // ReadOnlyTransaction.Query 759 ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb) 760 got, err := readAll(ro.Query( 761 ctx, 762 Statement{ 763 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4) ORDER BY SingerId", 764 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 765 })) 766 if err != nil { 767 t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err) 768 } 769 if !testEqual(got, test.want) { 770 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want) 771 } 772 rts, err := ro.Timestamp() 773 if err != nil { 774 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err) 775 } 776 if err := test.checkTs(rts); err != nil { 777 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err) 778 } 779 roTs := rts 780 // ReadOnlyTransaction.Read 781 got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 782 if err != nil { 783 t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err) 784 } 785 if !testEqual(got, test.want) { 786 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want) 787 } 788 rts, err = ro.Timestamp() 789 if err != nil { 790 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err) 791 } 792 if err := test.checkTs(rts); err != nil { 793 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err) 794 } 795 if roTs != rts { 796 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 797 } 798 // ReadOnlyTransaction.ReadRow 799 got = nil 800 for _, k := range []Key{{1}, {3}, {4}} { 801 r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 802 if err != nil { 803 continue 804 } 805 v, err := rowToValues(r) 806 if err != nil { 807 continue 808 } 809 got = append(got, v) 810 rts, err = ro.Timestamp() 811 if err != nil { 812 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 813 } 814 if err := test.checkTs(rts); err != nil { 815 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 816 } 817 if roTs != rts { 818 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 819 } 820 } 821 if !testEqual(got, test.want) { 822 t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want) 823 } 824 // SingleUse.ReadUsingIndex 825 got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 826 if err != nil { 827 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err) 828 } 829 // The results from ReadUsingIndex is sorted by the index rather than 830 // primary key. 831 if len(got) != len(test.want) { 832 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 833 } 834 for j, g := range got { 835 if j > 0 { 836 prev := got[j-1][1].(string) + got[j-1][2].(string) 837 curr := got[j][1].(string) + got[j][2].(string) 838 if strings.Compare(prev, curr) > 0 { 839 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 840 } 841 } 842 found := false 843 for _, w := range test.want { 844 if testEqual(g, w) { 845 found = true 846 } 847 } 848 if !found { 849 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 850 break 851 } 852 } 853 rts, err = ro.Timestamp() 854 if err != nil { 855 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 856 } 857 if err := test.checkTs(rts); err != nil { 858 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 859 } 860 if roTs != rts { 861 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 862 } 863 // ReadOnlyTransaction.ReadRowUsingIndex 864 got = nil 865 for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} { 866 r, err := ro.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"}) 867 if err != nil { 868 continue 869 } 870 v, err := rowToValues(r) 871 if err != nil { 872 continue 873 } 874 got = append(got, v) 875 rts, err = ro.Timestamp() 876 if err != nil { 877 t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err) 878 } 879 if err := test.checkTs(rts); err != nil { 880 t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err) 881 } 882 if roTs != rts { 883 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 884 } 885 } 886 if !testEqual(got, test.want) { 887 t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRowUsingIndex: %v, want %v", i, got, test.want) 888 } 889 ro.Close() 890 } 891} 892 893// Test ReadOnlyTransaction with different timestamp bound when there's an 894// update at the same time. 895func TestIntegration_UpdateDuringRead(t *testing.T) { 896 t.Parallel() 897 898 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 899 defer cancel() 900 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 901 defer cleanup() 902 903 for i, tb := range []TimestampBound{ 904 StrongRead(), 905 ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour 906 ExactStaleness(time.Minute * 30), 907 } { 908 ro := client.ReadOnlyTransaction().WithTimestampBound(tb) 909 _, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 910 if ErrCode(err) != codes.NotFound { 911 t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err) 912 } 913 914 m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i}) 915 if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 916 t.Fatal(err) 917 } 918 919 _, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 920 if ErrCode(err) != codes.NotFound { 921 t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err) 922 } 923 } 924} 925 926// Test ReadWriteTransaction. 927func TestIntegration_ReadWriteTransaction(t *testing.T) { 928 t.Parallel() 929 930 // Give a longer deadline because of transaction backoffs. 931 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 932 defer cancel() 933 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 934 defer cleanup() 935 936 // Set up two accounts 937 accounts := []*Mutation{ 938 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 939 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 940 } 941 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 942 t.Fatal(err) 943 } 944 wg := sync.WaitGroup{} 945 946 readBalance := func(iter *RowIterator) (int64, error) { 947 defer iter.Stop() 948 var bal int64 949 for { 950 row, err := iter.Next() 951 if err == iterator.Done { 952 return bal, nil 953 } 954 if err != nil { 955 return 0, err 956 } 957 if err := row.Column(0, &bal); err != nil { 958 return 0, err 959 } 960 } 961 } 962 963 for i := 0; i < 20; i++ { 964 wg.Add(1) 965 go func(iter int) { 966 defer wg.Done() 967 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 968 // Query Foo's balance and Bar's balance. 969 bf, e := readBalance(tx.Query(ctx, 970 Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}})) 971 if e != nil { 972 return e 973 } 974 verifyDirectPathRemoteAddress(t) 975 bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"})) 976 if e != nil { 977 return e 978 } 979 verifyDirectPathRemoteAddress(t) 980 if bf <= 0 { 981 return nil 982 } 983 bf-- 984 bb++ 985 return tx.BufferWrite([]*Mutation{ 986 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}), 987 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}), 988 }) 989 }) 990 if err != nil { 991 t.Errorf("%d: failed to execute transaction: %v", iter, err) 992 } 993 verifyDirectPathRemoteAddress(t) 994 }(i) 995 } 996 // Because of context timeout, all goroutines will eventually return. 997 wg.Wait() 998 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 999 var bf, bb int64 1000 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"}) 1001 if e != nil { 1002 return e 1003 } 1004 verifyDirectPathRemoteAddress(t) 1005 if ce := r.Column(0, &bf); ce != nil { 1006 return ce 1007 } 1008 bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"})) 1009 if e != nil { 1010 return e 1011 } 1012 verifyDirectPathRemoteAddress(t) 1013 if bf != 30 || bb != 21 { 1014 t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21) 1015 } 1016 return nil 1017 }) 1018 if err != nil { 1019 t.Errorf("failed to check balances: %v", err) 1020 } 1021 verifyDirectPathRemoteAddress(t) 1022} 1023 1024// Test ReadWriteTransactionWithOptions. 1025func TestIntegration_ReadWriteTransactionWithOptions(t *testing.T) { 1026 t.Parallel() 1027 skipEmulatorTest(t) 1028 1029 // Give a longer deadline because of transaction backoffs. 1030 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1031 defer cancel() 1032 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1033 defer cleanup() 1034 1035 // Set up two accounts 1036 accounts := []*Mutation{ 1037 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1038 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1039 } 1040 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1041 t.Fatal(err) 1042 } 1043 1044 readBalance := func(iter *RowIterator) (int64, error) { 1045 defer iter.Stop() 1046 var bal int64 1047 for { 1048 row, err := iter.Next() 1049 if err == iterator.Done { 1050 return bal, nil 1051 } 1052 if err != nil { 1053 return 0, err 1054 } 1055 if err := row.Column(0, &bal); err != nil { 1056 return 0, err 1057 } 1058 } 1059 } 1060 1061 txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}} 1062 resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1063 // Query Foo's balance and Bar's balance. 1064 bf, e := readBalance(tx.Query(ctx, 1065 Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}})) 1066 if e != nil { 1067 return e 1068 } 1069 bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"})) 1070 if e != nil { 1071 return e 1072 } 1073 if bf <= 0 { 1074 return nil 1075 } 1076 bf-- 1077 bb++ 1078 return tx.BufferWrite([]*Mutation{ 1079 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}), 1080 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}), 1081 }) 1082 }, txOpts) 1083 if err != nil { 1084 t.Fatalf("Failed to execute transaction: %v", err) 1085 } 1086 if resp.CommitStats == nil { 1087 t.Fatal("Missing commit stats in commit response") 1088 } 1089 if got, want := resp.CommitStats.MutationCount, int64(8); got != want { 1090 t.Errorf("Mismatch mutation count - got: %v, want: %v", got, want) 1091 } 1092} 1093 1094func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) { 1095 t.Parallel() 1096 skipEmulatorTest(t) 1097 1098 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1099 defer cancel() 1100 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1101 defer cleanup() 1102 1103 // Set up two accounts 1104 accounts := []*Mutation{ 1105 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1106 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1107 } 1108 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1109 t.Fatal(err) 1110 } 1111 1112 getBalance := func(txn *ReadWriteStmtBasedTransaction, key Key) (int64, error) { 1113 row, err := txn.ReadRow(ctx, "Accounts", key, []string{"Balance"}) 1114 if err != nil { 1115 return 0, err 1116 } 1117 var balance int64 1118 if err := row.Column(0, &balance); err != nil { 1119 return 0, err 1120 } 1121 return balance, nil 1122 } 1123 1124 statements := func(txn *ReadWriteStmtBasedTransaction) error { 1125 outBalance, err := getBalance(txn, Key{1}) 1126 if err != nil { 1127 return err 1128 } 1129 const transferAmt = 20 1130 if outBalance >= transferAmt { 1131 inBalance, err := getBalance(txn, Key{2}) 1132 if err != nil { 1133 return err 1134 } 1135 inBalance += transferAmt 1136 outBalance -= transferAmt 1137 cols := []string{"AccountId", "Balance"} 1138 txn.BufferWrite([]*Mutation{ 1139 Update("Accounts", cols, []interface{}{1, outBalance}), 1140 Update("Accounts", cols, []interface{}{2, inBalance}), 1141 }) 1142 } 1143 return nil 1144 } 1145 1146 for { 1147 tx, err := NewReadWriteStmtBasedTransaction(ctx, client) 1148 if err != nil { 1149 t.Fatalf("failed to begin a transaction: %v", err) 1150 } 1151 err = statements(tx) 1152 if err != nil && status.Code(err) != codes.Aborted { 1153 tx.Rollback(ctx) 1154 t.Fatalf("failed to execute statements: %v", err) 1155 } else if err == nil { 1156 _, err = tx.Commit(ctx) 1157 if err == nil { 1158 break 1159 } else if status.Code(err) != codes.Aborted { 1160 t.Fatalf("failed to commit a transaction: %v", err) 1161 } 1162 } 1163 // Set a default sleep time if the server delay is absent. 1164 delay := 10 * time.Millisecond 1165 if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay { 1166 delay = serverDelay 1167 } 1168 time.Sleep(delay) 1169 } 1170 1171 // Query the updated values. 1172 stmt := Statement{SQL: `SELECT AccountId, Nickname, Balance FROM Accounts ORDER BY AccountId`} 1173 iter := client.Single().Query(ctx, stmt) 1174 got, err := readAllAccountsTable(iter) 1175 if err != nil { 1176 t.Fatalf("failed to read data: %v", err) 1177 } 1178 want := [][]interface{}{ 1179 {int64(1), "Foo", int64(30)}, 1180 {int64(2), "Bar", int64(21)}, 1181 } 1182 if !testEqual(got, want) { 1183 t.Errorf("\ngot %v\nwant%v", got, want) 1184 } 1185} 1186 1187func TestIntegration_ReadWriteTransaction_StatementBasedWithOptions(t *testing.T) { 1188 t.Parallel() 1189 skipEmulatorTest(t) 1190 1191 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1192 defer cancel() 1193 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1194 defer cleanup() 1195 1196 // Set up two accounts 1197 accounts := []*Mutation{ 1198 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1199 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1200 } 1201 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1202 t.Fatal(err) 1203 } 1204 1205 getBalance := func(txn *ReadWriteStmtBasedTransaction, key Key) (int64, error) { 1206 row, err := txn.ReadRow(ctx, "Accounts", key, []string{"Balance"}) 1207 if err != nil { 1208 return 0, err 1209 } 1210 var balance int64 1211 if err := row.Column(0, &balance); err != nil { 1212 return 0, err 1213 } 1214 return balance, nil 1215 } 1216 1217 statements := func(txn *ReadWriteStmtBasedTransaction) error { 1218 outBalance, err := getBalance(txn, Key{1}) 1219 if err != nil { 1220 return err 1221 } 1222 const transferAmt = 20 1223 if outBalance >= transferAmt { 1224 inBalance, err := getBalance(txn, Key{2}) 1225 if err != nil { 1226 return err 1227 } 1228 inBalance += transferAmt 1229 outBalance -= transferAmt 1230 cols := []string{"AccountId", "Balance"} 1231 txn.BufferWrite([]*Mutation{ 1232 Update("Accounts", cols, []interface{}{1, outBalance}), 1233 Update("Accounts", cols, []interface{}{2, inBalance}), 1234 }) 1235 } 1236 return nil 1237 } 1238 1239 var resp CommitResponse 1240 txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}} 1241 for { 1242 tx, err := NewReadWriteStmtBasedTransactionWithOptions(ctx, client, txOpts) 1243 if err != nil { 1244 t.Fatalf("failed to begin a transaction: %v", err) 1245 } 1246 err = statements(tx) 1247 if err != nil && status.Code(err) != codes.Aborted { 1248 tx.Rollback(ctx) 1249 t.Fatalf("failed to execute statements: %v", err) 1250 } else if err == nil { 1251 resp, err = tx.CommitWithReturnResp(ctx) 1252 if err == nil { 1253 break 1254 } else if status.Code(err) != codes.Aborted { 1255 t.Fatalf("failed to commit a transaction: %v", err) 1256 } 1257 } 1258 // Set a default sleep time if the server delay is absent. 1259 delay := 10 * time.Millisecond 1260 if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay { 1261 delay = serverDelay 1262 } 1263 time.Sleep(delay) 1264 } 1265 if resp.CommitStats == nil { 1266 t.Fatal("Missing commit stats in commit response") 1267 } 1268 if got, want := resp.CommitStats.MutationCount, int64(8); got != want { 1269 t.Errorf("Mismatch mutation count - got: %v, want: %v", got, want) 1270 } 1271} 1272 1273func TestIntegration_Reads(t *testing.T) { 1274 t.Parallel() 1275 1276 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1277 defer cancel() 1278 // Set up testing environment. 1279 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 1280 defer cleanup() 1281 1282 // Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2". 1283 var ms []*Mutation 1284 for i := 0; i < 15; i++ { 1285 ms = append(ms, InsertOrUpdate(testTable, 1286 testTableColumns, 1287 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 1288 } 1289 // Don't use ApplyAtLeastOnce, so we can test the other code path. 1290 if _, err := client.Apply(ctx, ms); err != nil { 1291 t.Fatal(err) 1292 } 1293 verifyDirectPathRemoteAddress(t) 1294 1295 // Empty read. 1296 rows, err := readAllTestTable(client.Single().Read(ctx, testTable, 1297 KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns)) 1298 if err != nil { 1299 t.Fatal(err) 1300 } 1301 verifyDirectPathRemoteAddress(t) 1302 if got, want := len(rows), 0; got != want { 1303 t.Errorf("got %d, want %d", got, want) 1304 } 1305 1306 // Index empty read. 1307 rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, 1308 KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns)) 1309 if err != nil { 1310 t.Fatal(err) 1311 } 1312 verifyDirectPathRemoteAddress(t) 1313 if got, want := len(rows), 0; got != want { 1314 t.Errorf("got %d, want %d", got, want) 1315 } 1316 1317 // Point read. 1318 row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns) 1319 if err != nil { 1320 t.Fatal(err) 1321 } 1322 verifyDirectPathRemoteAddress(t) 1323 var got testTableRow 1324 if err := row.ToStruct(&got); err != nil { 1325 t.Fatal(err) 1326 } 1327 if want := (testTableRow{"k1", "v1"}); got != want { 1328 t.Errorf("got %v, want %v", got, want) 1329 } 1330 1331 // Point read not found. 1332 _, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns) 1333 if ErrCode(err) != codes.NotFound { 1334 t.Fatalf("got %v, want NotFound", err) 1335 } 1336 verifyDirectPathRemoteAddress(t) 1337 1338 // Index point read. 1339 rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns) 1340 if err != nil { 1341 t.Fatal(err) 1342 } 1343 verifyDirectPathRemoteAddress(t) 1344 var gotIndex testTableRow 1345 if err := rowIndex.ToStruct(&gotIndex); err != nil { 1346 t.Fatal(err) 1347 } 1348 if wantIndex := (testTableRow{"k1", "v1"}); gotIndex != wantIndex { 1349 t.Errorf("got %v, want %v", gotIndex, wantIndex) 1350 } 1351 // Index point read not found. 1352 _, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v999"}, testTableColumns) 1353 if ErrCode(err) != codes.NotFound { 1354 t.Fatalf("got %v, want NotFound", err) 1355 } 1356 verifyDirectPathRemoteAddress(t) 1357 rangeReads(ctx, t, client) 1358 indexRangeReads(ctx, t, client) 1359} 1360 1361func TestIntegration_EarlyTimestamp(t *testing.T) { 1362 t.Parallel() 1363 1364 // Test that we can get the timestamp from a read-only transaction as 1365 // soon as we have read at least one row. 1366 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1367 defer cancel() 1368 // Set up testing environment. 1369 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 1370 defer cleanup() 1371 1372 var ms []*Mutation 1373 for i := 0; i < 3; i++ { 1374 ms = append(ms, InsertOrUpdate(testTable, 1375 testTableColumns, 1376 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 1377 } 1378 if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil { 1379 t.Fatal(err) 1380 } 1381 1382 txn := client.Single() 1383 iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns) 1384 defer iter.Stop() 1385 // In single-use transaction, we should get an error before reading anything. 1386 if _, err := txn.Timestamp(); err == nil { 1387 t.Error("wanted error, got nil") 1388 } 1389 // After reading one row, the timestamp should be available. 1390 _, err := iter.Next() 1391 if err != nil { 1392 t.Fatal(err) 1393 } 1394 if _, err := txn.Timestamp(); err != nil { 1395 t.Errorf("got %v, want nil", err) 1396 } 1397 1398 txn = client.ReadOnlyTransaction() 1399 defer txn.Close() 1400 iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns) 1401 defer iter.Stop() 1402 // In an ordinary read-only transaction, the timestamp should be 1403 // available immediately. 1404 if _, err := txn.Timestamp(); err != nil { 1405 t.Errorf("got %v, want nil", err) 1406 } 1407} 1408 1409func TestIntegration_NestedTransaction(t *testing.T) { 1410 t.Parallel() 1411 1412 // You cannot use a transaction from inside a read-write transaction. 1413 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1414 defer cancel() 1415 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1416 defer cleanup() 1417 1418 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1419 _, err := client.ReadWriteTransaction(ctx, 1420 func(context.Context, *ReadWriteTransaction) error { return nil }) 1421 if ErrCode(err) != codes.FailedPrecondition { 1422 t.Fatalf("got %v, want FailedPrecondition", err) 1423 } 1424 _, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 1425 if ErrCode(err) != codes.FailedPrecondition { 1426 t.Fatalf("got %v, want FailedPrecondition", err) 1427 } 1428 rot := client.ReadOnlyTransaction() 1429 defer rot.Close() 1430 _, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 1431 if ErrCode(err) != codes.FailedPrecondition { 1432 t.Fatalf("got %v, want FailedPrecondition", err) 1433 } 1434 return nil 1435 }) 1436 if err != nil { 1437 t.Fatal(err) 1438 } 1439} 1440 1441func TestIntegration_CreateDBRetry(t *testing.T) { 1442 t.Parallel() 1443 1444 if databaseAdmin == nil { 1445 t.Skip("Integration tests skipped") 1446 } 1447 skipEmulatorTest(t) 1448 1449 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1450 defer cancel() 1451 dbName := dbNameSpace.New() 1452 1453 // Simulate an Unavailable error on the CreateDatabase RPC. 1454 hasReturnedUnavailable := false 1455 interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1456 err := invoker(ctx, method, req, reply, cc, opts...) 1457 if !hasReturnedUnavailable && err == nil { 1458 hasReturnedUnavailable = true 1459 return status.Error(codes.Unavailable, "Injected unavailable error") 1460 } 1461 return err 1462 } 1463 1464 dbAdmin, err := database.NewDatabaseAdminClient(ctx, option.WithGRPCDialOption(grpc.WithUnaryInterceptor(interceptor))) 1465 if err != nil { 1466 log.Fatalf("cannot create dbAdmin client: %v", err) 1467 } 1468 op, err := dbAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{ 1469 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 1470 CreateStatement: "CREATE DATABASE " + dbName, 1471 }) 1472 if err != nil { 1473 t.Fatalf("failed to create database: %v", err) 1474 } 1475 _, err = op.Wait(ctx) 1476 if err != nil { 1477 t.Fatalf("failed to create database: %v", err) 1478 } 1479 if !hasReturnedUnavailable { 1480 t.Fatalf("interceptor did not return Unavailable") 1481 } 1482} 1483 1484// Test client recovery on database recreation. 1485func TestIntegration_DbRemovalRecovery(t *testing.T) { 1486 t.Parallel() 1487 1488 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1489 defer cancel() 1490 // Create a client with MinOpened=0 to prevent the session pool maintainer 1491 // from repeatedly trying to create sessions for the invalid database. 1492 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, SessionPoolConfig{}, singerDBStatements) 1493 defer cleanup() 1494 1495 // Drop the testing database. 1496 if err := databaseAdmin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil { 1497 t.Fatalf("failed to drop testing database %v: %v", dbPath, err) 1498 } 1499 1500 // Now, send the query. 1501 iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 1502 defer iter.Stop() 1503 if _, err := iter.Next(); err == nil { 1504 t.Errorf("client sends query to removed database successfully, want it to fail") 1505 } 1506 verifyDirectPathRemoteAddress(t) 1507 1508 // Recreate database and table. 1509 dbName := dbPath[strings.LastIndex(dbPath, "/")+1:] 1510 op, err := databaseAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{ 1511 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 1512 CreateStatement: "CREATE DATABASE " + dbName, 1513 ExtraStatements: []string{ 1514 `CREATE TABLE Singers ( 1515 SingerId INT64 NOT NULL, 1516 FirstName STRING(1024), 1517 LastName STRING(1024), 1518 SingerInfo BYTES(MAX) 1519 ) PRIMARY KEY (SingerId)`, 1520 }, 1521 }) 1522 if err != nil { 1523 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 1524 } 1525 if _, err := op.Wait(ctx); err != nil { 1526 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 1527 } 1528 1529 // Now, send the query again. 1530 iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 1531 defer iter.Stop() 1532 _, err = iter.Next() 1533 if err != nil && err != iterator.Done { 1534 t.Errorf("failed to send query to database %v: %v", dbPath, err) 1535 } 1536 verifyDirectPathRemoteAddress(t) 1537} 1538 1539// Test encoding/decoding non-struct Cloud Spanner types. 1540func TestIntegration_BasicTypes(t *testing.T) { 1541 t.Parallel() 1542 1543 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1544 defer cancel() 1545 stmts := singerDBStatements 1546 if !isEmulatorEnvSet() { 1547 stmts = []string{ 1548 `CREATE TABLE Singers ( 1549 SingerId INT64 NOT NULL, 1550 FirstName STRING(1024), 1551 LastName STRING(1024), 1552 SingerInfo BYTES(MAX) 1553 ) PRIMARY KEY (SingerId)`, 1554 `CREATE INDEX SingerByName ON Singers(FirstName, LastName)`, 1555 `CREATE TABLE Accounts ( 1556 AccountId INT64 NOT NULL, 1557 Nickname STRING(100), 1558 Balance INT64 NOT NULL, 1559 ) PRIMARY KEY (AccountId)`, 1560 `CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`, 1561 `CREATE TABLE Types ( 1562 RowID INT64 NOT NULL, 1563 String STRING(MAX), 1564 StringArray ARRAY<STRING(MAX)>, 1565 Bytes BYTES(MAX), 1566 BytesArray ARRAY<BYTES(MAX)>, 1567 Int64a INT64, 1568 Int64Array ARRAY<INT64>, 1569 Bool BOOL, 1570 BoolArray ARRAY<BOOL>, 1571 Float64 FLOAT64, 1572 Float64Array ARRAY<FLOAT64>, 1573 Date DATE, 1574 DateArray ARRAY<DATE>, 1575 Timestamp TIMESTAMP, 1576 TimestampArray ARRAY<TIMESTAMP>, 1577 Numeric NUMERIC, 1578 NumericArray ARRAY<NUMERIC> 1579 ) PRIMARY KEY (RowID)`, 1580 } 1581 } 1582 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, stmts) 1583 defer cleanup() 1584 1585 t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z") 1586 // Boundaries 1587 t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z") 1588 t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z") 1589 d1, _ := civil.ParseDate("2016-11-15") 1590 // Boundaries 1591 d2, _ := civil.ParseDate("0001-01-01") 1592 d3, _ := civil.ParseDate("9999-12-31") 1593 1594 n0 := big.Rat{} 1595 n1p, _ := (&big.Rat{}).SetString("123456789") 1596 n2p, _ := (&big.Rat{}).SetString("123456789/1000000000") 1597 n1 := *n1p 1598 n2 := *n2p 1599 1600 tests := []struct { 1601 col string 1602 val interface{} 1603 want interface{} 1604 }{ 1605 {col: "String", val: ""}, 1606 {col: "String", val: "", want: NullString{"", true}}, 1607 {col: "String", val: "foo"}, 1608 {col: "String", val: "foo", want: NullString{"foo", true}}, 1609 {col: "String", val: NullString{"bar", true}, want: "bar"}, 1610 {col: "String", val: NullString{"bar", false}, want: NullString{"", false}}, 1611 {col: "String", val: nil, want: NullString{}}, 1612 {col: "StringArray", val: []string(nil), want: []NullString(nil)}, 1613 {col: "StringArray", val: []string{}, want: []NullString{}}, 1614 {col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}}, 1615 {col: "StringArray", val: []NullString(nil)}, 1616 {col: "StringArray", val: []NullString{}}, 1617 {col: "StringArray", val: []NullString{{"foo", true}, {}}}, 1618 {col: "Bytes", val: []byte{}}, 1619 {col: "Bytes", val: []byte{1, 2, 3}}, 1620 {col: "Bytes", val: []byte(nil)}, 1621 {col: "BytesArray", val: [][]byte(nil)}, 1622 {col: "BytesArray", val: [][]byte{}}, 1623 {col: "BytesArray", val: [][]byte{{1}, {2, 3}}}, 1624 {col: "Int64a", val: 0, want: int64(0)}, 1625 {col: "Int64a", val: -1, want: int64(-1)}, 1626 {col: "Int64a", val: 2, want: int64(2)}, 1627 {col: "Int64a", val: int64(3)}, 1628 {col: "Int64a", val: 4, want: NullInt64{4, true}}, 1629 {col: "Int64a", val: NullInt64{5, true}, want: int64(5)}, 1630 {col: "Int64a", val: NullInt64{6, true}, want: int64(6)}, 1631 {col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}}, 1632 {col: "Int64a", val: nil, want: NullInt64{}}, 1633 {col: "Int64Array", val: []int(nil), want: []NullInt64(nil)}, 1634 {col: "Int64Array", val: []int{}, want: []NullInt64{}}, 1635 {col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 1636 {col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)}, 1637 {col: "Int64Array", val: []int64{}, want: []NullInt64{}}, 1638 {col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 1639 {col: "Int64Array", val: []NullInt64(nil)}, 1640 {col: "Int64Array", val: []NullInt64{}}, 1641 {col: "Int64Array", val: []NullInt64{{1, true}, {}}}, 1642 {col: "Bool", val: false}, 1643 {col: "Bool", val: true}, 1644 {col: "Bool", val: false, want: NullBool{false, true}}, 1645 {col: "Bool", val: true, want: NullBool{true, true}}, 1646 {col: "Bool", val: NullBool{true, true}}, 1647 {col: "Bool", val: NullBool{false, false}}, 1648 {col: "Bool", val: nil, want: NullBool{}}, 1649 {col: "BoolArray", val: []bool(nil), want: []NullBool(nil)}, 1650 {col: "BoolArray", val: []bool{}, want: []NullBool{}}, 1651 {col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}}, 1652 {col: "BoolArray", val: []NullBool(nil)}, 1653 {col: "BoolArray", val: []NullBool{}}, 1654 {col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}}, 1655 {col: "Float64", val: 0.0}, 1656 {col: "Float64", val: 3.14}, 1657 {col: "Float64", val: math.NaN()}, 1658 {col: "Float64", val: math.Inf(1)}, 1659 {col: "Float64", val: math.Inf(-1)}, 1660 {col: "Float64", val: 2.78, want: NullFloat64{2.78, true}}, 1661 {col: "Float64", val: NullFloat64{2.71, true}, want: 2.71}, 1662 {col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}}, 1663 {col: "Float64", val: NullFloat64{0, false}}, 1664 {col: "Float64", val: nil, want: NullFloat64{}}, 1665 {col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)}, 1666 {col: "Float64Array", val: []float64{}, want: []NullFloat64{}}, 1667 {col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}}, 1668 {col: "Float64Array", val: []NullFloat64(nil)}, 1669 {col: "Float64Array", val: []NullFloat64{}}, 1670 {col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}}, 1671 {col: "Date", val: d1}, 1672 {col: "Date", val: d1, want: NullDate{d1, true}}, 1673 {col: "Date", val: NullDate{d1, true}}, 1674 {col: "Date", val: NullDate{d1, true}, want: d1}, 1675 {col: "Date", val: NullDate{civil.Date{}, false}}, 1676 {col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)}, 1677 {col: "DateArray", val: []civil.Date{}, want: []NullDate{}}, 1678 {col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}}, 1679 {col: "Timestamp", val: t1}, 1680 {col: "Timestamp", val: t1, want: NullTime{t1, true}}, 1681 {col: "Timestamp", val: NullTime{t1, true}}, 1682 {col: "Timestamp", val: NullTime{t1, true}, want: t1}, 1683 {col: "Timestamp", val: NullTime{}}, 1684 {col: "Timestamp", val: nil, want: NullTime{}}, 1685 {col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)}, 1686 {col: "TimestampArray", val: []time.Time{}, want: []NullTime{}}, 1687 {col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}}, 1688 } 1689 1690 if !isEmulatorEnvSet() { 1691 for _, tc := range []struct { 1692 col string 1693 val interface{} 1694 want interface{} 1695 }{ 1696 {col: "Numeric", val: n1}, 1697 {col: "Numeric", val: n2}, 1698 {col: "Numeric", val: n1, want: NullNumeric{n1, true}}, 1699 {col: "Numeric", val: n2, want: NullNumeric{n2, true}}, 1700 {col: "Numeric", val: NullNumeric{n1, true}, want: n1}, 1701 {col: "Numeric", val: NullNumeric{n1, true}, want: NullNumeric{n1, true}}, 1702 {col: "Numeric", val: NullNumeric{n0, false}}, 1703 {col: "Numeric", val: nil, want: NullNumeric{}}, 1704 {col: "NumericArray", val: []big.Rat(nil), want: []NullNumeric(nil)}, 1705 {col: "NumericArray", val: []big.Rat{}, want: []NullNumeric{}}, 1706 {col: "NumericArray", val: []big.Rat{n1, n2}, want: []NullNumeric{{n1, true}, {n2, true}}}, 1707 {col: "NumericArray", val: []NullNumeric(nil)}, 1708 {col: "NumericArray", val: []NullNumeric{}}, 1709 {col: "NumericArray", val: []NullNumeric{{n1, true}, {n2, true}, {}}}, 1710 } { 1711 tests = append(tests, tc) 1712 } 1713 } 1714 1715 // Write rows into table first. 1716 var muts []*Mutation 1717 for i, test := range tests { 1718 muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val})) 1719 } 1720 if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil { 1721 t.Fatal(err) 1722 } 1723 1724 for i, test := range tests { 1725 row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col}) 1726 if err != nil { 1727 t.Fatalf("Unable to fetch row %v: %v", i, err) 1728 } 1729 verifyDirectPathRemoteAddress(t) 1730 // Create new instance of type of test.want. 1731 want := test.want 1732 if want == nil { 1733 want = test.val 1734 } 1735 gotp := reflect.New(reflect.TypeOf(want)) 1736 if err := row.Column(0, gotp.Interface()); err != nil { 1737 t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err) 1738 continue 1739 } 1740 got := reflect.Indirect(gotp).Interface() 1741 1742 // One of the test cases is checking NaN handling. Given 1743 // NaN!=NaN, we can't use reflect to test for it. 1744 if isNaN(got) && isNaN(want) { 1745 continue 1746 } 1747 1748 // Check non-NaN cases. 1749 if !testEqual(got, want) { 1750 t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want) 1751 continue 1752 } 1753 } 1754} 1755 1756// Test decoding Cloud Spanner STRUCT type. 1757func TestIntegration_StructTypes(t *testing.T) { 1758 t.Parallel() 1759 1760 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1761 defer cancel() 1762 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1763 defer cleanup() 1764 1765 tests := []struct { 1766 q Statement 1767 want func(r *Row) error 1768 }{ 1769 { 1770 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`}, 1771 want: func(r *Row) error { 1772 // Test STRUCT ARRAY decoding to []NullRow. 1773 var rows []NullRow 1774 if err := r.Column(0, &rows); err != nil { 1775 return err 1776 } 1777 if len(rows) != 1 { 1778 return fmt.Errorf("len(rows) = %d; want 1", len(rows)) 1779 } 1780 if !rows[0].Valid { 1781 return fmt.Errorf("rows[0] is NULL") 1782 } 1783 var i, j int64 1784 if err := rows[0].Row.Columns(&i, &j); err != nil { 1785 return err 1786 } 1787 if i != 1 || j != 2 { 1788 return fmt.Errorf("got (%d,%d), want (1,2)", i, j) 1789 } 1790 return nil 1791 }, 1792 }, 1793 { 1794 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`}, 1795 want: func(r *Row) error { 1796 // Test Row.ToStruct. 1797 s := struct { 1798 Col1 []*struct { 1799 Foo int64 `spanner:"foo"` 1800 Bar int64 `spanner:"bar"` 1801 } `spanner:"col1"` 1802 }{} 1803 if err := r.ToStruct(&s); err != nil { 1804 return err 1805 } 1806 want := struct { 1807 Col1 []*struct { 1808 Foo int64 `spanner:"foo"` 1809 Bar int64 `spanner:"bar"` 1810 } `spanner:"col1"` 1811 }{ 1812 Col1: []*struct { 1813 Foo int64 `spanner:"foo"` 1814 Bar int64 `spanner:"bar"` 1815 }{ 1816 { 1817 Foo: 1, 1818 Bar: 2, 1819 }, 1820 }, 1821 } 1822 if !testEqual(want, s) { 1823 return fmt.Errorf("unexpected decoding result: %v, want %v", s, want) 1824 } 1825 return nil 1826 }, 1827 }, 1828 } 1829 for i, test := range tests { 1830 iter := client.Single().Query(ctx, test.q) 1831 defer iter.Stop() 1832 row, err := iter.Next() 1833 if err != nil { 1834 t.Errorf("%d: %v", i, err) 1835 continue 1836 } 1837 if err := test.want(row); err != nil { 1838 t.Errorf("%d: %v", i, err) 1839 continue 1840 } 1841 } 1842} 1843 1844func TestIntegration_StructParametersUnsupported(t *testing.T) { 1845 skipEmulatorTest(t) 1846 t.Parallel() 1847 1848 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1849 defer cancel() 1850 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 1851 defer cleanup() 1852 1853 for _, test := range []struct { 1854 param interface{} 1855 wantCode codes.Code 1856 wantMsgPart string 1857 }{ 1858 { 1859 struct { 1860 Field int 1861 }{10}, 1862 codes.Unimplemented, 1863 "Unsupported query shape: " + 1864 "A struct value cannot be returned as a column value. " + 1865 "Rewrite the query to flatten the struct fields in the result.", 1866 }, 1867 { 1868 []struct { 1869 Field int 1870 }{{10}, {20}}, 1871 codes.Unimplemented, 1872 "Unsupported query shape: " + 1873 "This query can return a null-valued array of struct, " + 1874 "which is not supported by Spanner.", 1875 }, 1876 } { 1877 iter := client.Single().Query(ctx, Statement{ 1878 SQL: "SELECT @p", 1879 Params: map[string]interface{}{"p": test.param}, 1880 }) 1881 _, err := iter.Next() 1882 iter.Stop() 1883 if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok { 1884 t.Fatal(msg) 1885 } 1886 } 1887} 1888 1889// Test queries of the form "SELECT expr". 1890func TestIntegration_QueryExpressions(t *testing.T) { 1891 t.Parallel() 1892 1893 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1894 defer cancel() 1895 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 1896 defer cleanup() 1897 1898 newRow := func(vals []interface{}) *Row { 1899 row, err := NewRow(make([]string, len(vals)), vals) 1900 if err != nil { 1901 t.Fatal(err) 1902 } 1903 return row 1904 } 1905 1906 tests := []struct { 1907 expr string 1908 want interface{} 1909 }{ 1910 {"1", int64(1)}, 1911 {"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}}, 1912 {"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}}, 1913 {"IEEE_DIVIDE(1, 0)", math.Inf(1)}, 1914 {"IEEE_DIVIDE(-1, 0)", math.Inf(-1)}, 1915 {"IEEE_DIVIDE(0, 0)", math.NaN()}, 1916 // TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate. 1917 {"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}}, 1918 {"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}}, 1919 {"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}}, 1920 } 1921 for _, test := range tests { 1922 iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr}) 1923 defer iter.Stop() 1924 row, err := iter.Next() 1925 if err != nil { 1926 t.Errorf("%q: %v", test.expr, err) 1927 continue 1928 } 1929 // Create new instance of type of test.want. 1930 gotp := reflect.New(reflect.TypeOf(test.want)) 1931 if err := row.Column(0, gotp.Interface()); err != nil { 1932 t.Errorf("%q: Column returned error %v", test.expr, err) 1933 continue 1934 } 1935 got := reflect.Indirect(gotp).Interface() 1936 // TODO(jba): remove isNaN special case when we have a better equality predicate. 1937 if isNaN(got) && isNaN(test.want) { 1938 continue 1939 } 1940 if !testEqual(got, test.want) { 1941 t.Errorf("%q\n got %#v\nwant %#v", test.expr, got, test.want) 1942 } 1943 } 1944} 1945 1946func TestIntegration_QueryStats(t *testing.T) { 1947 skipEmulatorTest(t) 1948 t.Parallel() 1949 1950 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1951 defer cancel() 1952 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 1953 defer cleanup() 1954 1955 accounts := []*Mutation{ 1956 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1957 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1958 } 1959 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1960 t.Fatal(err) 1961 } 1962 const sql = "SELECT Balance FROM Accounts" 1963 1964 qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil}) 1965 if err != nil { 1966 t.Fatal(err) 1967 } 1968 if len(qp.PlanNodes) == 0 { 1969 t.Error("got zero plan nodes, expected at least one") 1970 } 1971 1972 iter := client.Single().QueryWithStats(ctx, Statement{sql, nil}) 1973 defer iter.Stop() 1974 for { 1975 _, err := iter.Next() 1976 if err == iterator.Done { 1977 break 1978 } 1979 if err != nil { 1980 t.Fatal(err) 1981 } 1982 } 1983 if iter.QueryPlan == nil { 1984 t.Error("got nil QueryPlan, expected one") 1985 } 1986 if iter.QueryStats == nil { 1987 t.Error("got nil QueryStats, expected some") 1988 } 1989} 1990 1991func TestIntegration_InvalidDatabase(t *testing.T) { 1992 t.Parallel() 1993 1994 if databaseAdmin == nil { 1995 t.Skip("Integration tests skipped") 1996 } 1997 ctx := context.Background() 1998 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID) 1999 c, err := createClient(ctx, dbPath, SessionPoolConfig{}) 2000 // Client creation should succeed even if the database is invalid. 2001 if err != nil { 2002 t.Fatal(err) 2003 } 2004 _, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"}) 2005 if msg, ok := matchError(err, codes.NotFound, ""); !ok { 2006 t.Fatal(msg) 2007 } 2008} 2009 2010func TestIntegration_ReadErrors(t *testing.T) { 2011 t.Parallel() 2012 2013 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2014 defer cancel() 2015 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 2016 defer cleanup() 2017 2018 var ms []*Mutation 2019 for i := 0; i < 2; i++ { 2020 ms = append(ms, InsertOrUpdate(testTable, 2021 testTableColumns, 2022 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v")})) 2023 } 2024 if _, err := client.Apply(ctx, ms); err != nil { 2025 t.Fatal(err) 2026 } 2027 2028 // Read over invalid table fails 2029 _, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"}) 2030 if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok { 2031 t.Error(msg) 2032 } 2033 // Read over invalid column fails 2034 _, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"}) 2035 if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok { 2036 t.Error(msg) 2037 } 2038 2039 // Invalid query fails 2040 iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"}) 2041 defer iter.Stop() 2042 _, err = iter.Next() 2043 if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok { 2044 t.Error(msg) 2045 } 2046 2047 // Read should fail on cancellation. 2048 cctx, cancel := context.WithCancel(ctx) 2049 cancel() 2050 _, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"}) 2051 if msg, ok := matchError(err, codes.Canceled, ""); !ok { 2052 t.Error(msg) 2053 } 2054 // Read should fail if deadline exceeded. 2055 dctx, cancel := context.WithTimeout(ctx, time.Nanosecond) 2056 defer cancel() 2057 <-dctx.Done() 2058 _, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"}) 2059 if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok { 2060 t.Error(msg) 2061 } 2062 // Read should fail if there are multiple rows returned. 2063 _, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v"}, testTableColumns) 2064 wantMsgPart := fmt.Sprintf("more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", testTable, Key{"v"}, testTableIndex) 2065 if msg, ok := matchError(err, codes.FailedPrecondition, wantMsgPart); !ok { 2066 t.Error(msg) 2067 } 2068} 2069 2070// Test TransactionRunner. Test that transactions are aborted and retried as 2071// expected. 2072func TestIntegration_TransactionRunner(t *testing.T) { 2073 skipEmulatorTest(t) 2074 t.Parallel() 2075 2076 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2077 defer cancel() 2078 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2079 defer cleanup() 2080 2081 // Test 1: User error should abort the transaction. 2082 _, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2083 tx.BufferWrite([]*Mutation{ 2084 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})}) 2085 return errors.New("user error") 2086 }) 2087 // Empty read. 2088 rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"})) 2089 if err != nil { 2090 t.Fatal(err) 2091 } 2092 if got, want := len(rows), 0; got != want { 2093 t.Errorf("Empty read, got %d, want %d.", got, want) 2094 } 2095 2096 // Test 2: Expect abort and retry. 2097 // We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by 2098 // committing writes to the column txn2 have read, and expect the following 2099 // read to abort and txn2 retries. 2100 2101 // Set up two accounts 2102 accounts := []*Mutation{ 2103 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}), 2104 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}), 2105 } 2106 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 2107 t.Fatal(err) 2108 } 2109 2110 var ( 2111 cTxn1Start = make(chan struct{}) 2112 cTxn1Commit = make(chan struct{}) 2113 cTxn2Start = make(chan struct{}) 2114 wg sync.WaitGroup 2115 ) 2116 2117 // read balance, check error if we don't expect abort. 2118 readBalance := func(tx interface { 2119 ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) 2120 }, key int64, expectAbort bool) (int64, error) { 2121 var b int64 2122 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"}) 2123 if e != nil { 2124 if expectAbort && !isAbortedErr(e) { 2125 t.Errorf("ReadRow got %v, want Abort error.", e) 2126 } 2127 // Verify that we received and are able to extract retry info from 2128 // the aborted error. 2129 if _, hasRetryInfo := ExtractRetryDelay(e); !hasRetryInfo { 2130 t.Errorf("Got Abort error without RetryInfo\nGot: %v", e) 2131 } 2132 return b, e 2133 } 2134 if ce := r.Column(0, &b); ce != nil { 2135 return b, ce 2136 } 2137 return b, nil 2138 } 2139 2140 wg.Add(2) 2141 // Txn 1 2142 go func() { 2143 defer wg.Done() 2144 var once sync.Once 2145 _, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2146 b, e := readBalance(tx, 1, false) 2147 if e != nil { 2148 return e 2149 } 2150 // txn 1 can abort, in that case we skip closing the channel on 2151 // retry. 2152 once.Do(func() { close(cTxn1Start) }) 2153 e = tx.BufferWrite([]*Mutation{ 2154 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})}) 2155 if e != nil { 2156 return e 2157 } 2158 // Wait for second transaction. 2159 <-cTxn2Start 2160 return nil 2161 }) 2162 close(cTxn1Commit) 2163 if e != nil { 2164 t.Errorf("Transaction 1 commit, got %v, want nil.", e) 2165 } 2166 }() 2167 // Txn 2 2168 go func() { 2169 // Wait until txn 1 starts. 2170 <-cTxn1Start 2171 defer wg.Done() 2172 var ( 2173 once sync.Once 2174 b1 int64 2175 b2 int64 2176 e error 2177 ) 2178 _, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2179 if b1, e = readBalance(tx, 1, false); e != nil { 2180 return e 2181 } 2182 // Skip closing channel on retry. 2183 once.Do(func() { close(cTxn2Start) }) 2184 // Wait until txn 1 successfully commits. 2185 <-cTxn1Commit 2186 // Txn1 has committed and written a balance to the account. Now this 2187 // transaction (txn2) reads and re-writes the balance. The first 2188 // time through, it will abort because it overlaps with txn1. Then 2189 // it will retry after txn1 commits, and succeed. 2190 if b2, e = readBalance(tx, 2, true); e != nil { 2191 return e 2192 } 2193 return tx.BufferWrite([]*Mutation{ 2194 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})}) 2195 }) 2196 if e != nil { 2197 t.Errorf("Transaction 2 commit, got %v, want nil.", e) 2198 } 2199 }() 2200 wg.Wait() 2201 // Check that both transactions' effects are visible. 2202 for i := int64(1); i <= int64(2); i++ { 2203 if b, e := readBalance(client.Single(), i, false); e != nil { 2204 t.Fatalf("ReadBalance for key %d error %v.", i, e) 2205 } else if b != i { 2206 t.Errorf("Balance for key %d, got %d, want %d.", i, b, i) 2207 } 2208 } 2209} 2210 2211// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then 2212// serialize and deserialize both transaction and partition to be used in 2213// execution on another client, and compare results. 2214func TestIntegration_BatchQuery(t *testing.T) { 2215 t.Parallel() 2216 2217 // Set up testing environment. 2218 var ( 2219 client2 *Client 2220 err error 2221 ) 2222 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2223 defer cancel() 2224 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 2225 defer cleanup() 2226 2227 if err = populate(ctx, client); err != nil { 2228 t.Fatal(err) 2229 } 2230 if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil { 2231 t.Fatal(err) 2232 } 2233 defer client2.Close() 2234 2235 // PartitionQuery 2236 var ( 2237 txn *BatchReadOnlyTransaction 2238 partitions []*Partition 2239 stmt = Statement{SQL: "SELECT * FROM test;"} 2240 ) 2241 2242 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 2243 t.Fatal(err) 2244 } 2245 defer txn.Cleanup(ctx) 2246 if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil { 2247 t.Fatal(err) 2248 } 2249 2250 // Reconstruct BatchReadOnlyTransactionID and execute partitions 2251 var ( 2252 tid2 BatchReadOnlyTransactionID 2253 data []byte 2254 gotResult bool // if we get matching result from two separate txns 2255 ) 2256 if data, err = txn.ID.MarshalBinary(); err != nil { 2257 t.Fatalf("encoding failed %v", err) 2258 } 2259 if err = tid2.UnmarshalBinary(data); err != nil { 2260 t.Fatalf("decoding failed %v", err) 2261 } 2262 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 2263 2264 // Execute Partitions and compare results 2265 for i, p := range partitions { 2266 iter := txn.Execute(ctx, p) 2267 defer iter.Stop() 2268 p2 := serdesPartition(t, i, p) 2269 iter2 := txn2.Execute(ctx, &p2) 2270 defer iter2.Stop() 2271 2272 row1, err1 := iter.Next() 2273 row2, err2 := iter2.Next() 2274 if err1 != err2 { 2275 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 2276 continue 2277 } 2278 if !testEqual(row1, row2) { 2279 t.Fatalf("execution returned different values: %v, %v", row1, row2) 2280 continue 2281 } 2282 if row1 == nil { 2283 continue 2284 } 2285 var a, b string 2286 if err = row1.Columns(&a, &b); err != nil { 2287 t.Fatalf("failed to parse row %v", err) 2288 continue 2289 } 2290 if a == str1 && b == str2 { 2291 gotResult = true 2292 } 2293 } 2294 if !gotResult { 2295 t.Fatalf("execution didn't return expected values") 2296 } 2297} 2298 2299// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery 2300func TestIntegration_BatchRead(t *testing.T) { 2301 t.Parallel() 2302 2303 // Set up testing environment. 2304 var ( 2305 client2 *Client 2306 err error 2307 ) 2308 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2309 defer cancel() 2310 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 2311 defer cleanup() 2312 2313 if err = populate(ctx, client); err != nil { 2314 t.Fatal(err) 2315 } 2316 if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil { 2317 t.Fatal(err) 2318 } 2319 defer client2.Close() 2320 2321 // PartitionRead 2322 var ( 2323 txn *BatchReadOnlyTransaction 2324 partitions []*Partition 2325 ) 2326 2327 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 2328 t.Fatal(err) 2329 } 2330 defer txn.Cleanup(ctx) 2331 if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 2332 t.Fatal(err) 2333 } 2334 2335 // Reconstruct BatchReadOnlyTransactionID and execute partitions. 2336 var ( 2337 tid2 BatchReadOnlyTransactionID 2338 data []byte 2339 gotResult bool // if we get matching result from two separate txns 2340 ) 2341 if data, err = txn.ID.MarshalBinary(); err != nil { 2342 t.Fatalf("encoding failed %v", err) 2343 } 2344 if err = tid2.UnmarshalBinary(data); err != nil { 2345 t.Fatalf("decoding failed %v", err) 2346 } 2347 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 2348 2349 // Execute Partitions and compare results. 2350 for i, p := range partitions { 2351 iter := txn.Execute(ctx, p) 2352 defer iter.Stop() 2353 p2 := serdesPartition(t, i, p) 2354 iter2 := txn2.Execute(ctx, &p2) 2355 defer iter2.Stop() 2356 2357 row1, err1 := iter.Next() 2358 row2, err2 := iter2.Next() 2359 if err1 != err2 { 2360 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 2361 continue 2362 } 2363 if !testEqual(row1, row2) { 2364 t.Fatalf("execution returned different values: %v, %v", row1, row2) 2365 continue 2366 } 2367 if row1 == nil { 2368 continue 2369 } 2370 var a, b string 2371 if err = row1.Columns(&a, &b); err != nil { 2372 t.Fatalf("failed to parse row %v", err) 2373 continue 2374 } 2375 if a == str1 && b == str2 { 2376 gotResult = true 2377 } 2378 } 2379 if !gotResult { 2380 t.Fatalf("execution didn't return expected values") 2381 } 2382} 2383 2384// Test normal txReadEnv method on BatchReadOnlyTransaction. 2385func TestIntegration_BROTNormal(t *testing.T) { 2386 t.Parallel() 2387 2388 // Set up testing environment and create txn. 2389 var ( 2390 txn *BatchReadOnlyTransaction 2391 err error 2392 row *Row 2393 i int64 2394 ) 2395 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2396 defer cancel() 2397 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, simpleDBStatements) 2398 defer cleanup() 2399 2400 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 2401 t.Fatal(err) 2402 } 2403 defer txn.Cleanup(ctx) 2404 if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 2405 t.Fatal(err) 2406 } 2407 // Normal query should work with BatchReadOnlyTransaction. 2408 stmt2 := Statement{SQL: "SELECT 1"} 2409 iter := txn.Query(ctx, stmt2) 2410 defer iter.Stop() 2411 2412 row, err = iter.Next() 2413 if err != nil { 2414 t.Errorf("query failed with %v", err) 2415 } 2416 if err = row.Columns(&i); err != nil { 2417 t.Errorf("failed to parse row %v", err) 2418 } 2419} 2420 2421func TestIntegration_CommitTimestamp(t *testing.T) { 2422 t.Parallel() 2423 2424 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2425 defer cancel() 2426 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, ctsDBStatements) 2427 defer cleanup() 2428 2429 type testTableRow struct { 2430 Key string 2431 Ts NullTime 2432 } 2433 2434 var ( 2435 cts1, cts2, ts1, ts2 time.Time 2436 err error 2437 ) 2438 2439 // Apply mutation in sequence, expect to see commit timestamp in good order, 2440 // check also the commit timestamp returned 2441 for _, it := range []struct { 2442 k string 2443 t *time.Time 2444 }{ 2445 {"a", &cts1}, 2446 {"b", &cts2}, 2447 } { 2448 tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}} 2449 m, err := InsertStruct("TestTable", tt) 2450 if err != nil { 2451 t.Fatal(err) 2452 } 2453 *it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()) 2454 if err != nil { 2455 t.Fatal(err) 2456 } 2457 } 2458 2459 txn := client.ReadOnlyTransaction() 2460 for _, it := range []struct { 2461 k string 2462 t *time.Time 2463 }{ 2464 {"a", &ts1}, 2465 {"b", &ts2}, 2466 } { 2467 if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil { 2468 t.Fatal(err) 2469 } else { 2470 var got testTableRow 2471 if err := r.ToStruct(&got); err != nil { 2472 t.Fatal(err) 2473 } 2474 *it.t = got.Ts.Time 2475 } 2476 } 2477 if !cts1.Equal(ts1) { 2478 t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1) 2479 } 2480 if !cts2.Equal(ts2) { 2481 t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2) 2482 } 2483 2484 // Try writing a timestamp in the future to commit timestamp, expect error. 2485 _, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce()) 2486 if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok { 2487 t.Error(msg) 2488 } 2489} 2490 2491func TestIntegration_DML(t *testing.T) { 2492 t.Parallel() 2493 2494 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2495 defer cancel() 2496 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2497 defer cleanup() 2498 2499 // Function that reads a single row's first name from within a transaction. 2500 readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) { 2501 row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"}) 2502 if err != nil { 2503 return "", err 2504 } 2505 var fn string 2506 if err := row.Column(0, &fn); err != nil { 2507 return "", err 2508 } 2509 return fn, nil 2510 } 2511 2512 // Function that reads multiple rows' first names from outside a read/write 2513 // transaction. 2514 readFirstNames := func(keys ...int) []string { 2515 var ks []KeySet 2516 for _, k := range keys { 2517 ks = append(ks, Key{k}) 2518 } 2519 iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"}) 2520 var got []string 2521 var fn string 2522 err := iter.Do(func(row *Row) error { 2523 if err := row.Column(0, &fn); err != nil { 2524 return err 2525 } 2526 got = append(got, fn) 2527 return nil 2528 }) 2529 if err != nil { 2530 t.Fatalf("readFirstNames(%v): %v", keys, err) 2531 } 2532 return got 2533 } 2534 2535 // Use ReadWriteTransaction.Query to execute a DML statement. 2536 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2537 iter := tx.Query(ctx, Statement{ 2538 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`, 2539 }) 2540 defer iter.Stop() 2541 row, err := iter.Next() 2542 if ErrCode(err) == codes.Aborted { 2543 return err 2544 } 2545 if err != iterator.Done { 2546 t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err) 2547 } 2548 if iter.RowCount != 1 { 2549 t.Errorf("row count: got %d, want 1", iter.RowCount) 2550 } 2551 // The results of the DML statement should be visible to the transaction. 2552 got, err := readFirstName(tx, 1) 2553 if err != nil { 2554 return err 2555 } 2556 if want := "Umm"; got != want { 2557 t.Errorf("got %q, want %q", got, want) 2558 } 2559 return nil 2560 }) 2561 if err != nil { 2562 t.Fatal(err) 2563 } 2564 2565 // Use ReadWriteTransaction.Update to execute a DML statement. 2566 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2567 count, err := tx.Update(ctx, Statement{ 2568 SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`, 2569 }) 2570 if err != nil { 2571 return err 2572 } 2573 if count != 1 { 2574 t.Errorf("row count: got %d, want 1", count) 2575 } 2576 got, err := readFirstName(tx, 2) 2577 if err != nil { 2578 return err 2579 } 2580 if want := "Eduard"; got != want { 2581 t.Errorf("got %q, want %q", got, want) 2582 } 2583 return nil 2584 2585 }) 2586 if err != nil { 2587 t.Fatal(err) 2588 } 2589 2590 // Roll back a DML statement and confirm that it didn't happen. 2591 var fail = errors.New("fail") 2592 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2593 _, err := tx.Update(ctx, Statement{ 2594 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 2595 }) 2596 if err != nil { 2597 return err 2598 } 2599 return fail 2600 }) 2601 if err != fail { 2602 t.Fatalf("rolling back: got error %v, want the error 'fail'", err) 2603 } 2604 _, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"}) 2605 if got, want := ErrCode(err), codes.NotFound; got != want { 2606 t.Errorf("got %s, want %s", got, want) 2607 } 2608 2609 // Run two DML statements in the same transaction. 2610 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2611 _, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`}) 2612 if err != nil { 2613 return err 2614 } 2615 _, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`}) 2616 if err != nil { 2617 return err 2618 } 2619 return nil 2620 }) 2621 if err != nil { 2622 t.Fatal(err) 2623 } 2624 got := readFirstNames(1, 2) 2625 want := []string{"Oum", "Eddie"} 2626 if !testEqual(got, want) { 2627 t.Errorf("got %v, want %v", got, want) 2628 } 2629 2630 // Run a DML statement and an ordinary mutation in the same transaction. 2631 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2632 _, err := tx.Update(ctx, Statement{ 2633 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 2634 }) 2635 if err != nil { 2636 return err 2637 } 2638 return tx.BufferWrite([]*Mutation{ 2639 Insert("Singers", []string{"SingerId", "FirstName", "LastName"}, 2640 []interface{}{4, "Andy", "Irvine"}), 2641 }) 2642 }) 2643 if err != nil { 2644 t.Fatal(err) 2645 } 2646 got = readFirstNames(3, 4) 2647 want = []string{"Audra", "Andy"} 2648 if !testEqual(got, want) { 2649 t.Errorf("got %v, want %v", got, want) 2650 } 2651 2652 // Attempt to run a query using update. 2653 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 2654 _, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`}) 2655 return err 2656 }) 2657 if got, want := ErrCode(err), codes.InvalidArgument; got != want { 2658 t.Errorf("got %s, want %s", got, want) 2659 } 2660} 2661 2662func TestIntegration_StructParametersBind(t *testing.T) { 2663 t.Parallel() 2664 2665 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2666 defer cancel() 2667 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil) 2668 defer cleanup() 2669 2670 type tRow []interface{} 2671 type tRows []struct{ trow tRow } 2672 2673 type allFields struct { 2674 Stringf string 2675 Intf int 2676 Boolf bool 2677 Floatf float64 2678 Bytef []byte 2679 Timef time.Time 2680 Datef civil.Date 2681 } 2682 allColumns := []string{ 2683 "Stringf", 2684 "Intf", 2685 "Boolf", 2686 "Floatf", 2687 "Bytef", 2688 "Timef", 2689 "Datef", 2690 } 2691 s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1} 2692 s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2} 2693 2694 dynamicStructType := reflect.StructOf([]reflect.StructField{ 2695 {Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`}, 2696 }) 2697 s3 := reflect.New(dynamicStructType) 2698 s3.Elem().Field(0).Set(reflect.ValueOf(t1)) 2699 2700 for i, test := range []struct { 2701 param interface{} 2702 sql string 2703 cols []string 2704 trows tRows 2705 }{ 2706 // Struct value. 2707 { 2708 s1, 2709 "SELECT" + 2710 " @p.Stringf," + 2711 " @p.Intf," + 2712 " @p.Boolf," + 2713 " @p.Floatf," + 2714 " @p.Bytef," + 2715 " @p.Timef," + 2716 " @p.Datef", 2717 allColumns, 2718 tRows{ 2719 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 2720 }, 2721 }, 2722 // Array of struct value. 2723 { 2724 []allFields{s1, s2}, 2725 "SELECT * FROM UNNEST(@p)", 2726 allColumns, 2727 tRows{ 2728 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 2729 {tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}}, 2730 }, 2731 }, 2732 // Null struct. 2733 { 2734 (*allFields)(nil), 2735 "SELECT @p IS NULL", 2736 []string{""}, 2737 tRows{ 2738 {tRow{true}}, 2739 }, 2740 }, 2741 // Null Array of struct. 2742 { 2743 []allFields(nil), 2744 "SELECT @p IS NULL", 2745 []string{""}, 2746 tRows{ 2747 {tRow{true}}, 2748 }, 2749 }, 2750 // Empty struct. 2751 { 2752 struct{}{}, 2753 "SELECT @p IS NULL ", 2754 []string{""}, 2755 tRows{ 2756 {tRow{false}}, 2757 }, 2758 }, 2759 // Empty array of struct. 2760 { 2761 []allFields{}, 2762 "SELECT * FROM UNNEST(@p) ", 2763 allColumns, 2764 tRows{}, 2765 }, 2766 // Struct with duplicate fields. 2767 { 2768 struct { 2769 A int `spanner:"field"` 2770 B int `spanner:"field"` 2771 }{10, 20}, 2772 "SELECT * FROM UNNEST([@p]) ", 2773 []string{"field", "field"}, 2774 tRows{ 2775 {tRow{10, 20}}, 2776 }, 2777 }, 2778 // Struct with unnamed fields. 2779 { 2780 struct { 2781 A string `spanner:""` 2782 }{"hello"}, 2783 "SELECT * FROM UNNEST([@p]) ", 2784 []string{""}, 2785 tRows{ 2786 {tRow{"hello"}}, 2787 }, 2788 }, 2789 // Mixed struct. 2790 { 2791 struct { 2792 DynamicStructField interface{} `spanner:"f1"` 2793 ArrayStructField []*allFields `spanner:"f2"` 2794 }{ 2795 DynamicStructField: s3.Interface(), 2796 ArrayStructField: []*allFields{nil}, 2797 }, 2798 "SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ", 2799 []string{"ff1", "", ""}, 2800 tRows{ 2801 {tRow{t1, 1, true}}, 2802 }, 2803 }, 2804 } { 2805 iter := client.Single().Query(ctx, Statement{ 2806 SQL: test.sql, 2807 Params: map[string]interface{}{"p": test.param}, 2808 }) 2809 var gotRows []*Row 2810 err := iter.Do(func(r *Row) error { 2811 gotRows = append(gotRows, r) 2812 return nil 2813 }) 2814 if err != nil { 2815 t.Errorf("Failed to execute test case %d, error: %v", i, err) 2816 } 2817 2818 var wantRows []*Row 2819 for j, row := range test.trows { 2820 r, err := NewRow(test.cols, row.trow) 2821 if err != nil { 2822 t.Errorf("Invalid row %d in test case %d", j, i) 2823 } 2824 wantRows = append(wantRows, r) 2825 } 2826 if !testEqual(gotRows, wantRows) { 2827 t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows) 2828 } 2829 } 2830} 2831 2832func TestIntegration_PDML(t *testing.T) { 2833 t.Parallel() 2834 2835 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2836 defer cancel() 2837 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2838 defer cleanup() 2839 2840 columns := []string{"SingerId", "FirstName", "LastName"} 2841 2842 // Populate the Singers table. 2843 var muts []*Mutation 2844 for _, row := range [][]interface{}{ 2845 {1, "Umm", "Kulthum"}, 2846 {2, "Eduard", "Khil"}, 2847 {3, "Audra", "McDonald"}, 2848 {4, "Enrique", "Iglesias"}, 2849 {5, "Shakira", "Ripoll"}, 2850 } { 2851 muts = append(muts, Insert("Singers", columns, row)) 2852 } 2853 if _, err := client.Apply(ctx, muts); err != nil { 2854 t.Fatal(err) 2855 } 2856 // Identifiers in PDML statements must be fully qualified. 2857 // TODO(jba): revisit the above. 2858 count, err := client.PartitionedUpdate(ctx, Statement{ 2859 SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= @end`, 2860 Params: map[string]interface{}{ 2861 "end": 3, 2862 }, 2863 }) 2864 if err != nil { 2865 t.Fatal(err) 2866 } 2867 if want := int64(3); count != want { 2868 t.Fatalf("got %d, want %d", count, want) 2869 } 2870 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2871 if err != nil { 2872 t.Fatal(err) 2873 } 2874 want := [][]interface{}{ 2875 {int64(1), "changed", "Kulthum"}, 2876 {int64(2), "changed", "Khil"}, 2877 {int64(3), "changed", "McDonald"}, 2878 {int64(4), "Enrique", "Iglesias"}, 2879 {int64(5), "Shakira", "Ripoll"}, 2880 } 2881 if !testEqual(got, want) { 2882 t.Errorf("\ngot %v\nwant%v", got, want) 2883 } 2884} 2885 2886func TestIntegration_BatchDML(t *testing.T) { 2887 t.Parallel() 2888 2889 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2890 defer cancel() 2891 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2892 defer cleanup() 2893 2894 columns := []string{"SingerId", "FirstName", "LastName"} 2895 2896 // Populate the Singers table. 2897 var muts []*Mutation 2898 for _, row := range [][]interface{}{ 2899 {1, "Umm", "Kulthum"}, 2900 {2, "Eduard", "Khil"}, 2901 {3, "Audra", "McDonald"}, 2902 } { 2903 muts = append(muts, Insert("Singers", columns, row)) 2904 } 2905 if _, err := client.Apply(ctx, muts); err != nil { 2906 t.Fatal(err) 2907 } 2908 2909 var counts []int64 2910 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2911 counts, err = tx.BatchUpdate(ctx, []Statement{ 2912 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2913 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2914 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2915 }) 2916 return err 2917 }) 2918 2919 if err != nil { 2920 t.Fatal(err) 2921 } 2922 if want := []int64{1, 1, 1}; !testEqual(counts, want) { 2923 t.Fatalf("got %d, want %d", counts, want) 2924 } 2925 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2926 if err != nil { 2927 t.Fatal(err) 2928 } 2929 want := [][]interface{}{ 2930 {int64(1), "changed 1", "Kulthum"}, 2931 {int64(2), "changed 2", "Khil"}, 2932 {int64(3), "changed 3", "McDonald"}, 2933 } 2934 if !testEqual(got, want) { 2935 t.Errorf("\ngot %v\nwant%v", got, want) 2936 } 2937} 2938 2939func TestIntegration_BatchDML_NoStatements(t *testing.T) { 2940 t.Parallel() 2941 2942 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2943 defer cancel() 2944 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2945 defer cleanup() 2946 2947 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2948 _, err = tx.BatchUpdate(ctx, []Statement{}) 2949 return err 2950 }) 2951 if err == nil { 2952 t.Fatal("expected error, got nil") 2953 } 2954 if s, ok := status.FromError(err); ok { 2955 if s.Code() != codes.InvalidArgument { 2956 t.Fatalf("expected InvalidArgument, got %v", err) 2957 } 2958 } else { 2959 t.Fatalf("expected InvalidArgument, got %v", err) 2960 } 2961} 2962 2963func TestIntegration_BatchDML_TwoStatements(t *testing.T) { 2964 t.Parallel() 2965 2966 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2967 defer cancel() 2968 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 2969 defer cleanup() 2970 2971 columns := []string{"SingerId", "FirstName", "LastName"} 2972 2973 // Populate the Singers table. 2974 var muts []*Mutation 2975 for _, row := range [][]interface{}{ 2976 {1, "Umm", "Kulthum"}, 2977 {2, "Eduard", "Khil"}, 2978 {3, "Audra", "McDonald"}, 2979 } { 2980 muts = append(muts, Insert("Singers", columns, row)) 2981 } 2982 if _, err := client.Apply(ctx, muts); err != nil { 2983 t.Fatal(err) 2984 } 2985 2986 var updateCount int64 2987 var batchCounts []int64 2988 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2989 batchCounts, err = tx.BatchUpdate(ctx, []Statement{ 2990 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2991 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2992 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2993 }) 2994 if err != nil { 2995 return err 2996 } 2997 2998 updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}) 2999 return err 3000 }) 3001 if err != nil { 3002 t.Fatal(err) 3003 } 3004 if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) { 3005 t.Fatalf("got %d, want %d", batchCounts, want) 3006 } 3007 if updateCount != 1 { 3008 t.Fatalf("got %v, want 1", updateCount) 3009 } 3010} 3011 3012func TestIntegration_BatchDML_Error(t *testing.T) { 3013 t.Parallel() 3014 3015 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 3016 defer cancel() 3017 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) 3018 defer cleanup() 3019 3020 columns := []string{"SingerId", "FirstName", "LastName"} 3021 3022 // Populate the Singers table. 3023 var muts []*Mutation 3024 for _, row := range [][]interface{}{ 3025 {1, "Umm", "Kulthum"}, 3026 {2, "Eduard", "Khil"}, 3027 {3, "Audra", "McDonald"}, 3028 } { 3029 muts = append(muts, Insert("Singers", columns, row)) 3030 } 3031 if _, err := client.Apply(ctx, muts); err != nil { 3032 t.Fatal(err) 3033 } 3034 3035 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 3036 counts, err := tx.BatchUpdate(ctx, []Statement{ 3037 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 3038 {SQL: `some illegal statement`}, 3039 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 3040 }) 3041 if err == nil { 3042 t.Fatal("expected err, got nil") 3043 } 3044 // The statement may also have been aborted by Spanner, and in that 3045 // case we should just let the client library retry the transaction. 3046 if status.Code(err) == codes.Aborted { 3047 return err 3048 } 3049 if want := []int64{1}; !testEqual(counts, want) { 3050 t.Fatalf("got %d, want %d", counts, want) 3051 } 3052 3053 got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns)) 3054 // Aborted error is ok, just retry the transaction. 3055 if status.Code(err) == codes.Aborted { 3056 return err 3057 } 3058 if err != nil { 3059 t.Fatal(err) 3060 } 3061 want := [][]interface{}{ 3062 {int64(1), "changed 1", "Kulthum"}, 3063 {int64(2), "Eduard", "Khil"}, 3064 {int64(3), "Audra", "McDonald"}, 3065 } 3066 if !testEqual(got, want) { 3067 t.Errorf("\ngot %v\nwant%v", got, want) 3068 } 3069 3070 return nil 3071 }) 3072 if err != nil { 3073 t.Fatal(err) 3074 } 3075} 3076 3077func TestIntegration_StartBackupOperation(t *testing.T) { 3078 skipEmulatorTest(t) 3079 t.Parallel() 3080 3081 // Backups can be slow, so use a 30 minute timeout. 3082 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) 3083 defer cancel() 3084 _, testDatabaseName, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, backuDBStatements) 3085 defer cleanup() 3086 3087 backupID := backupIDSpace.New() 3088 backupName := fmt.Sprintf("projects/%s/instances/%s/backups/%s", testProjectID, testInstanceID, backupID) 3089 // Minimum expiry time is 6 hours 3090 expires := time.Now().Add(time.Hour * 7) 3091 respLRO, err := databaseAdmin.StartBackupOperation(ctx, backupID, testDatabaseName, expires) 3092 if err != nil { 3093 t.Fatal(err) 3094 } 3095 3096 _, err = respLRO.Wait(ctx) 3097 if err != nil { 3098 t.Fatal(err) 3099 } 3100 respMetadata, err := respLRO.Metadata() 3101 if err != nil { 3102 t.Fatalf("backup response metadata, got error %v, want nil", err) 3103 } 3104 if respMetadata.Database != testDatabaseName { 3105 t.Fatalf("backup database name, got %s, want %s", respMetadata.Database, testDatabaseName) 3106 } 3107 if respMetadata.Progress.ProgressPercent != 100 { 3108 t.Fatalf("backup progress percent, got %d, want 100", respMetadata.Progress.ProgressPercent) 3109 } 3110 respCheck, err := databaseAdmin.GetBackup(ctx, &adminpb.GetBackupRequest{Name: backupName}) 3111 if err != nil { 3112 t.Fatalf("backup metadata, got error %v, want nil", err) 3113 } 3114 if respCheck.CreateTime == nil { 3115 t.Fatal("backup create time, got nil, want non-nil") 3116 } 3117 if respCheck.State != adminpb.Backup_READY { 3118 t.Fatalf("backup state, got %v, want %v", respCheck.State, adminpb.Backup_READY) 3119 } 3120 if respCheck.SizeBytes == 0 { 3121 t.Fatalf("backup size, got %d, want non-zero", respCheck.SizeBytes) 3122 } 3123} 3124 3125// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed. 3126func TestIntegration_DirectPathFallback(t *testing.T) { 3127 t.Parallel() 3128 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 3129 defer cancel() 3130 // Set up testing environment. 3131 client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) 3132 defer cleanup() 3133 3134 if !dpConfig.attemptDirectPath { 3135 return 3136 } 3137 if len(blackholeDpv6Cmd) == 0 { 3138 t.Fatal("-it.blackhole-dpv6-cmd unset") 3139 } 3140 if len(blackholeDpv4Cmd) == 0 { 3141 t.Fatal("-it.blackhole-dpv4-cmd unset") 3142 } 3143 if len(allowDpv6Cmd) == 0 { 3144 t.Fatal("-it.allowdpv6-cmd unset") 3145 } 3146 if len(allowDpv4Cmd) == 0 { 3147 t.Fatal("-it.allowdpv4-cmd unset") 3148 } 3149 3150 // Precondition: wait for DirectPath to connect. 3151 countEnough := examineTraffic(ctx, client /*blackholeDP = */, false) 3152 if !countEnough { 3153 t.Fatalf("Failed to observe RPCs over DirectPath") 3154 } 3155 3156 // Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. 3157 blackholeOrAllowDirectPath(t /*blackholeDP = */, true) 3158 countEnough = examineTraffic(ctx, client /*blackholeDP = */, true) 3159 if !countEnough { 3160 t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") 3161 } 3162 3163 // Disable the blackhole, and client should use DirectPath again. 3164 blackholeOrAllowDirectPath(t /*blackholeDP = */, false) 3165 countEnough = examineTraffic(ctx, client /*blackholeDP = */, false) 3166 if !countEnough { 3167 t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") 3168 } 3169} 3170 3171// Prepare initializes Cloud Spanner testing DB and clients. 3172func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) { 3173 if databaseAdmin == nil { 3174 t.Skip("Integration tests skipped") 3175 } 3176 // Construct a unique test DB name. 3177 dbName := dbNameSpace.New() 3178 3179 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName) 3180 // Create database and tables. 3181 op, err := databaseAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{ 3182 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 3183 CreateStatement: "CREATE DATABASE " + dbName, 3184 ExtraStatements: statements, 3185 }) 3186 if err != nil { 3187 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 3188 } 3189 if _, err := op.Wait(ctx); err != nil { 3190 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 3191 } 3192 client, err := createClient(ctx, dbPath, spc) 3193 if err != nil { 3194 t.Fatalf("cannot create data client on DB %v: %v", dbPath, err) 3195 } 3196 return client, dbPath, func() { 3197 client.Close() 3198 } 3199} 3200 3201func cleanupInstances() { 3202 if instanceAdmin == nil { 3203 // Integration tests skipped. 3204 return 3205 } 3206 3207 ctx := context.Background() 3208 parent := fmt.Sprintf("projects/%v", testProjectID) 3209 iter := instanceAdmin.ListInstances(ctx, &instancepb.ListInstancesRequest{ 3210 Parent: parent, 3211 Filter: "name:gotest-", 3212 }) 3213 expireAge := 24 * time.Hour 3214 3215 for { 3216 inst, err := iter.Next() 3217 if err == iterator.Done { 3218 break 3219 } 3220 if err != nil { 3221 panic(err) 3222 } 3223 3224 _, instID, err := parseInstanceName(inst.Name) 3225 if err != nil { 3226 log.Printf("Error: %v\n", err) 3227 continue 3228 } 3229 if instanceNameSpace.Older(instID, expireAge) { 3230 log.Printf("Deleting instance %s", inst.Name) 3231 deleteInstanceAndBackups(ctx, inst.Name) 3232 } 3233 } 3234} 3235 3236func deleteInstanceAndBackups(ctx context.Context, instanceName string) { 3237 // First delete any lingering backups that might have been left on 3238 // the instance. 3239 backups := databaseAdmin.ListBackups(ctx, &adminpb.ListBackupsRequest{Parent: instanceName}) 3240 for { 3241 backup, err := backups.Next() 3242 if err == iterator.Done { 3243 break 3244 } 3245 if err != nil { 3246 log.Printf("failed to retrieve backups from instance %s because of error %v", instanceName, err) 3247 break 3248 } 3249 if err := databaseAdmin.DeleteBackup(ctx, &adminpb.DeleteBackupRequest{Name: backup.Name}); err != nil { 3250 log.Printf("failed to delete backup %s (error %v)", backup.Name, err) 3251 } 3252 } 3253 3254 if err := instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{Name: instanceName}); err != nil { 3255 log.Printf("failed to delete instance %s (error %v), might need a manual removal", 3256 instanceName, err) 3257 } 3258} 3259 3260func rangeReads(ctx context.Context, t *testing.T, client *Client) { 3261 checkRange := func(ks KeySet, wantNums ...int) { 3262 if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok { 3263 t.Errorf("key set %+v: %s", ks, msg) 3264 } 3265 } 3266 3267 checkRange(Key{"k1"}, 1) 3268 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4) 3269 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5) 3270 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5) 3271 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4) 3272 3273 // Partial key specification. 3274 checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9) 3275 checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9) 3276 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10) 3277 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11) 3278 3279 // The following produce empty ranges. 3280 // TODO(jba): Consider a multi-part key to illustrate partial key behavior. 3281 // checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen}) 3282 // checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen}) 3283 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen}) 3284 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed}) 3285 3286 // Prefix is component-wise, not string prefix. 3287 checkRange(Key{"k1"}.AsPrefix(), 1) 3288 checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 3289 3290 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 3291} 3292 3293func indexRangeReads(ctx context.Context, t *testing.T, client *Client) { 3294 checkRange := func(ks KeySet, wantNums ...int) { 3295 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns), 3296 wantNums); !ok { 3297 t.Errorf("key set %+v: %s", ks, msg) 3298 } 3299 } 3300 3301 checkRange(Key{"v1"}, 1) 3302 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4) 3303 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5) 3304 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5) 3305 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4) 3306 3307 // // Partial key specification. 3308 checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9) 3309 checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9) 3310 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10) 3311 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11) 3312 3313 // // The following produce empty ranges. 3314 // checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen}) 3315 // checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen}) 3316 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen}) 3317 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed}) 3318 3319 // // Prefix is component-wise, not string prefix. 3320 checkRange(Key{"v1"}.AsPrefix(), 1) 3321 checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 3322 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 3323 3324 // Read from an index with DESC ordering. 3325 wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0} 3326 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns), 3327 wantNums); !ok { 3328 t.Errorf("desc: %s", msg) 3329 } 3330} 3331 3332type testTableRow struct{ Key, StringValue string } 3333 3334func compareRows(iter *RowIterator, wantNums []int) (string, bool) { 3335 rows, err := readAllTestTable(iter) 3336 if err != nil { 3337 return err.Error(), false 3338 } 3339 want := map[string]string{} 3340 for _, n := range wantNums { 3341 want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n) 3342 } 3343 got := map[string]string{} 3344 for _, r := range rows { 3345 got[r.Key] = r.StringValue 3346 } 3347 if !testEqual(got, want) { 3348 return fmt.Sprintf("got %v, want %v", got, want), false 3349 } 3350 return "", true 3351} 3352 3353func isNaN(x interface{}) bool { 3354 f, ok := x.(float64) 3355 if !ok { 3356 return false 3357 } 3358 return math.IsNaN(f) 3359} 3360 3361// createClient creates Cloud Spanner data client. 3362func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) { 3363 opts := grpcHeaderChecker.CallOptions() 3364 if spannerHost != "" { 3365 opts = append(opts, option.WithEndpoint(spannerHost)) 3366 } 3367 if dpConfig.attemptDirectPath { 3368 opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo)))) 3369 } 3370 client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}, opts...) 3371 if err != nil { 3372 return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err) 3373 } 3374 return client, nil 3375} 3376 3377// populate prepares the database with some data. 3378func populate(ctx context.Context, client *Client) error { 3379 // Populate data 3380 var err error 3381 m := InsertMap("test", map[string]interface{}{ 3382 "a": str1, 3383 "b": str2, 3384 }) 3385 _, err = client.Apply(ctx, []*Mutation{m}) 3386 return err 3387} 3388 3389func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) { 3390 if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) { 3391 return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false 3392 } 3393 return "", true 3394} 3395 3396func rowToValues(r *Row) ([]interface{}, error) { 3397 var x int64 3398 var y, z string 3399 if err := r.Column(0, &x); err != nil { 3400 return nil, err 3401 } 3402 if err := r.Column(1, &y); err != nil { 3403 return nil, err 3404 } 3405 if err := r.Column(2, &z); err != nil { 3406 return nil, err 3407 } 3408 return []interface{}{x, y, z}, nil 3409} 3410 3411func readAll(iter *RowIterator) ([][]interface{}, error) { 3412 defer iter.Stop() 3413 var vals [][]interface{} 3414 for { 3415 row, err := iter.Next() 3416 if err == iterator.Done { 3417 return vals, nil 3418 } 3419 if err != nil { 3420 return nil, err 3421 } 3422 v, err := rowToValues(row) 3423 if err != nil { 3424 return nil, err 3425 } 3426 vals = append(vals, v) 3427 } 3428} 3429 3430func readAllTestTable(iter *RowIterator) ([]testTableRow, error) { 3431 defer iter.Stop() 3432 var vals []testTableRow 3433 for { 3434 row, err := iter.Next() 3435 if err == iterator.Done { 3436 if iter.Metadata == nil { 3437 // All queries should always return metadata, regardless whether 3438 // they return any rows or not. 3439 return nil, errors.New("missing metadata from query") 3440 } 3441 return vals, nil 3442 } 3443 if err != nil { 3444 return nil, err 3445 } 3446 var ttr testTableRow 3447 if err := row.ToStruct(&ttr); err != nil { 3448 return nil, err 3449 } 3450 vals = append(vals, ttr) 3451 } 3452} 3453 3454func readAllAccountsTable(iter *RowIterator) ([][]interface{}, error) { 3455 defer iter.Stop() 3456 var vals [][]interface{} 3457 for { 3458 row, err := iter.Next() 3459 if err == iterator.Done { 3460 return vals, nil 3461 } 3462 if err != nil { 3463 return nil, err 3464 } 3465 var id, balance int64 3466 var nickname string 3467 err = row.Columns(&id, &nickname, &balance) 3468 if err != nil { 3469 return nil, err 3470 } 3471 vals = append(vals, []interface{}{id, nickname, balance}) 3472 } 3473} 3474 3475func maxDuration(a, b time.Duration) time.Duration { 3476 if a > b { 3477 return a 3478 } 3479 return b 3480} 3481 3482func isEmulatorEnvSet() bool { 3483 return os.Getenv("SPANNER_EMULATOR_HOST") != "" 3484} 3485 3486func skipEmulatorTest(t *testing.T) { 3487 if isEmulatorEnvSet() { 3488 t.Skip("Skipping testing against the emulator.") 3489 } 3490} 3491 3492func verifyDirectPathRemoteAddress(t *testing.T) { 3493 t.Helper() 3494 if !dpConfig.attemptDirectPath { 3495 return 3496 } 3497 if remoteIP, res := isDirectPathRemoteAddress(); !res { 3498 if dpConfig.directPathIPv4Only { 3499 t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP) 3500 } else { 3501 t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP) 3502 } 3503 } 3504} 3505 3506func isDirectPathRemoteAddress() (_ string, _ bool) { 3507 remoteIP := peerInfo.Addr.String() 3508 // DirectPath ipv4-only can only use ipv4 traffic. 3509 if dpConfig.directPathIPv4Only { 3510 return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) 3511 } 3512 // DirectPath ipv6 can use either ipv4 or ipv6 traffic. 3513 return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) 3514} 3515 3516// examineTraffic counts RPCs use DirectPath or CFE traffic. 3517func examineTraffic(ctx context.Context, client *Client, expectDP bool) bool { 3518 var numCount uint64 3519 const ( 3520 numRPCsToSend = 20 3521 minCompleteRPC = 40 3522 ) 3523 countEnough := false 3524 start := time.Now() 3525 for !countEnough && time.Since(start) < 2*time.Minute { 3526 for i := 0; i < numRPCsToSend; i++ { 3527 _, _ = readAllTestTable(client.Single().Read(ctx, testTable, Key{"k1"}, testTableColumns)) 3528 if _, useDP := isDirectPathRemoteAddress(); useDP != expectDP { 3529 numCount++ 3530 } 3531 time.Sleep(100 * time.Millisecond) 3532 countEnough = numCount >= minCompleteRPC 3533 } 3534 } 3535 return countEnough 3536} 3537 3538func blackholeOrAllowDirectPath(t *testing.T, blackholeDP bool) { 3539 if dpConfig.directPathIPv4Only { 3540 if blackholeDP { 3541 cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) 3542 out, _ := cmdRes.CombinedOutput() 3543 t.Logf(string(out)) 3544 } else { 3545 cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) 3546 out, _ := cmdRes.CombinedOutput() 3547 t.Logf(string(out)) 3548 } 3549 return 3550 } 3551 // DirectPath supports both ipv4 and ipv6 3552 if blackholeDP { 3553 cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) 3554 out, _ := cmdRes.CombinedOutput() 3555 t.Logf(string(out)) 3556 cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) 3557 out, _ = cmdRes.CombinedOutput() 3558 t.Logf(string(out)) 3559 } else { 3560 cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) 3561 out, _ := cmdRes.CombinedOutput() 3562 t.Logf(string(out)) 3563 cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) 3564 out, _ = cmdRes.CombinedOutput() 3565 t.Logf(string(out)) 3566 } 3567} 3568