1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "errors" 22 "flag" 23 "fmt" 24 "log" 25 "math" 26 "os" 27 "reflect" 28 "strings" 29 "sync" 30 "testing" 31 "time" 32 33 "cloud.google.com/go/civil" 34 "cloud.google.com/go/internal/testutil" 35 "cloud.google.com/go/internal/uid" 36 database "cloud.google.com/go/spanner/admin/database/apiv1" 37 "google.golang.org/api/iterator" 38 "google.golang.org/api/option" 39 adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" 40 "google.golang.org/grpc/codes" 41 "google.golang.org/grpc/status" 42) 43 44var ( 45 // testProjectID specifies the project used for testing. It can be changed 46 // by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID. 47 testProjectID = testutil.ProjID() 48 49 dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) 50 51 // TODO(deklerk): When we can programmatically create instances, we should use 52 // uid.New as the test instance name. 53 // testInstanceID specifies the Cloud Spanner instance used for testing. 54 testInstanceID = "go-integration-test" 55 56 testTable = "TestTable" 57 testTableIndex = "TestTableByValue" 58 testTableColumns = []string{"Key", "StringValue"} 59 60 // admin is a spanner.DatabaseAdminClient. 61 admin *database.DatabaseAdminClient 62 63 singerDBStatements = []string{ 64 `CREATE TABLE Singers ( 65 SingerId INT64 NOT NULL, 66 FirstName STRING(1024), 67 LastName STRING(1024), 68 SingerInfo BYTES(MAX) 69 ) PRIMARY KEY (SingerId)`, 70 `CREATE INDEX SingerByName ON Singers(FirstName, LastName)`, 71 `CREATE TABLE Accounts ( 72 AccountId INT64 NOT NULL, 73 Nickname STRING(100), 74 Balance INT64 NOT NULL, 75 ) PRIMARY KEY (AccountId)`, 76 `CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`, 77 `CREATE TABLE Types ( 78 RowID INT64 NOT NULL, 79 String STRING(MAX), 80 StringArray ARRAY<STRING(MAX)>, 81 Bytes BYTES(MAX), 82 BytesArray ARRAY<BYTES(MAX)>, 83 Int64a INT64, 84 Int64Array ARRAY<INT64>, 85 Bool BOOL, 86 BoolArray ARRAY<BOOL>, 87 Float64 FLOAT64, 88 Float64Array ARRAY<FLOAT64>, 89 Date DATE, 90 DateArray ARRAY<DATE>, 91 Timestamp TIMESTAMP, 92 TimestampArray ARRAY<TIMESTAMP>, 93 ) PRIMARY KEY (RowID)`, 94 } 95 96 readDBStatements = []string{ 97 `CREATE TABLE TestTable ( 98 Key STRING(MAX) NOT NULL, 99 StringValue STRING(MAX) 100 ) PRIMARY KEY (Key)`, 101 `CREATE INDEX TestTableByValue ON TestTable(StringValue)`, 102 `CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`, 103 } 104 105 simpleDBStatements = []string{ 106 `CREATE TABLE test ( 107 a STRING(1024), 108 b STRING(1024), 109 ) PRIMARY KEY (a)`, 110 } 111 simpleDBTableColumns = []string{"a", "b"} 112 113 ctsDBStatements = []string{ 114 `CREATE TABLE TestTable ( 115 Key STRING(MAX) NOT NULL, 116 Ts TIMESTAMP OPTIONS (allow_commit_timestamp = true), 117 ) PRIMARY KEY (Key)`, 118 } 119) 120 121const ( 122 str1 = "alice" 123 str2 = "a@example.com" 124) 125 126func TestMain(m *testing.M) { 127 cleanup := initIntegrationTests() 128 res := m.Run() 129 cleanup() 130 os.Exit(res) 131} 132 133func initIntegrationTests() func() { 134 ctx := context.Background() 135 flag.Parse() // Needed for testing.Short(). 136 noop := func() {} 137 138 if testing.Short() { 139 log.Println("Integration tests skipped in -short mode.") 140 return noop 141 } 142 143 if testProjectID == "" { 144 log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing") 145 return noop 146 } 147 148 ts := testutil.TokenSource(ctx, AdminScope, Scope) 149 if ts == nil { 150 log.Printf("Integration test skipped: cannot get service account credential from environment variable %v", "GCLOUD_TESTS_GOLANG_KEY") 151 return noop 152 } 153 var err error 154 155 // Create Admin client and Data client. 156 admin, err = database.NewDatabaseAdminClient(ctx, option.WithTokenSource(ts), option.WithEndpoint(endpoint)) 157 if err != nil { 158 log.Fatalf("cannot create admin client: %v", err) 159 } 160 161 return func() { 162 cleanupDatabases() 163 admin.Close() 164 } 165} 166 167// Test SingleUse transaction. 168func TestIntegration_SingleUse(t *testing.T) { 169 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) 170 defer cancel() 171 // Set up testing environment. 172 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 173 defer cleanup() 174 175 writes := []struct { 176 row []interface{} 177 ts time.Time 178 }{ 179 {row: []interface{}{1, "Marc", "Foo"}}, 180 {row: []interface{}{2, "Tars", "Bar"}}, 181 {row: []interface{}{3, "Alpha", "Beta"}}, 182 {row: []interface{}{4, "Last", "End"}}, 183 } 184 // Try to write four rows through the Apply API. 185 for i, w := range writes { 186 var err error 187 m := InsertOrUpdate("Singers", 188 []string{"SingerId", "FirstName", "LastName"}, 189 w.row) 190 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 191 t.Fatal(err) 192 } 193 } 194 195 // Test reading rows with different timestamp bounds. 196 for i, test := range []struct { 197 name string 198 want [][]interface{} 199 tb TimestampBound 200 checkTs func(time.Time) error 201 }{ 202 { 203 name: "strong", 204 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 205 tb: StrongRead(), 206 checkTs: func(ts time.Time) error { 207 // writes[3] is the last write, all subsequent strong read 208 // should have a timestamp larger than that. 209 if ts.Before(writes[3].ts) { 210 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 211 } 212 return nil 213 }, 214 }, 215 { 216 name: "min_read_timestamp", 217 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 218 tb: MinReadTimestamp(writes[3].ts), 219 checkTs: func(ts time.Time) error { 220 if ts.Before(writes[3].ts) { 221 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 222 } 223 return nil 224 }, 225 }, 226 { 227 name: "max_staleness", 228 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 229 tb: MaxStaleness(time.Second), 230 checkTs: func(ts time.Time) error { 231 if ts.Before(writes[3].ts) { 232 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 233 } 234 return nil 235 }, 236 }, 237 { 238 name: "read_timestamp", 239 want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 240 tb: ReadTimestamp(writes[2].ts), 241 checkTs: func(ts time.Time) error { 242 if ts != writes[2].ts { 243 return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts) 244 } 245 return nil 246 }, 247 }, 248 { 249 name: "exact_staleness", 250 want: nil, 251 // Specify a staleness which should be already before this test because 252 // context timeout is set to be 10s. 253 tb: ExactStaleness(11 * time.Second), 254 checkTs: func(ts time.Time) error { 255 if ts.After(writes[0].ts) { 256 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts) 257 } 258 return nil 259 }, 260 }, 261 } { 262 t.Run(test.name, func(t *testing.T) { 263 // SingleUse.Query 264 su := client.Single().WithTimestampBound(test.tb) 265 got, err := readAll(su.Query( 266 ctx, 267 Statement{ 268 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)", 269 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 270 })) 271 if err != nil { 272 t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err) 273 } 274 if !testEqual(got, test.want) { 275 t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want) 276 } 277 rts, err := su.Timestamp() 278 if err != nil { 279 t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err) 280 } 281 if err := test.checkTs(rts); err != nil { 282 t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err) 283 } 284 // SingleUse.Read 285 su = client.Single().WithTimestampBound(test.tb) 286 got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 287 if err != nil { 288 t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err) 289 } 290 if !testEqual(got, test.want) { 291 t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want) 292 } 293 rts, err = su.Timestamp() 294 if err != nil { 295 t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err) 296 } 297 if err := test.checkTs(rts); err != nil { 298 t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err) 299 } 300 // SingleUse.ReadRow 301 got = nil 302 for _, k := range []Key{{1}, {3}, {4}} { 303 su = client.Single().WithTimestampBound(test.tb) 304 r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 305 if err != nil { 306 continue 307 } 308 v, err := rowToValues(r) 309 if err != nil { 310 continue 311 } 312 got = append(got, v) 313 rts, err = su.Timestamp() 314 if err != nil { 315 t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 316 } 317 if err := test.checkTs(rts); err != nil { 318 t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 319 } 320 } 321 if !testEqual(got, test.want) { 322 t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want) 323 } 324 // SingleUse.ReadUsingIndex 325 su = client.Single().WithTimestampBound(test.tb) 326 got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 327 if err != nil { 328 t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err) 329 } 330 // The results from ReadUsingIndex is sorted by the index rather than primary key. 331 if len(got) != len(test.want) { 332 t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 333 } 334 for j, g := range got { 335 if j > 0 { 336 prev := got[j-1][1].(string) + got[j-1][2].(string) 337 curr := got[j][1].(string) + got[j][2].(string) 338 if strings.Compare(prev, curr) > 0 { 339 t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 340 } 341 } 342 found := false 343 for _, w := range test.want { 344 if testEqual(g, w) { 345 found = true 346 } 347 } 348 if !found { 349 t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 350 break 351 } 352 } 353 rts, err = su.Timestamp() 354 if err != nil { 355 t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 356 } 357 if err := test.checkTs(rts); err != nil { 358 t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 359 } 360 }) 361 } 362} 363 364func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) { 365 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) 366 defer cancel() 367 // Set up testing environment. 368 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 369 defer cleanup() 370 371 writes := []struct { 372 row []interface{} 373 ts time.Time 374 }{ 375 {row: []interface{}{1, "Marc", "Foo"}}, 376 {row: []interface{}{2, "Tars", "Bar"}}, 377 {row: []interface{}{3, "Alpha", "Beta"}}, 378 {row: []interface{}{4, "Last", "End"}}, 379 } 380 // Try to write four rows through the Apply API. 381 for i, w := range writes { 382 var err error 383 m := InsertOrUpdate("Singers", 384 []string{"SingerId", "FirstName", "LastName"}, 385 w.row) 386 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 387 t.Fatal(err) 388 } 389 } 390 391 su := client.Single() 392 const limit = 1 393 gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), 394 []string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit})) 395 if err != nil { 396 t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err) 397 } 398 if got, want := len(gotRows), limit; got != want { 399 t.Errorf("got %d, want %d", got, want) 400 } 401} 402 403// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it 404// also tests for a single timestamp across multiple reads. 405func TestIntegration_ReadOnlyTransaction(t *testing.T) { 406 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 407 defer cancel() 408 // Set up testing environment. 409 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 410 defer cleanup() 411 412 writes := []struct { 413 row []interface{} 414 ts time.Time 415 }{ 416 {row: []interface{}{1, "Marc", "Foo"}}, 417 {row: []interface{}{2, "Tars", "Bar"}}, 418 {row: []interface{}{3, "Alpha", "Beta"}}, 419 {row: []interface{}{4, "Last", "End"}}, 420 } 421 // Try to write four rows through the Apply API. 422 for i, w := range writes { 423 var err error 424 m := InsertOrUpdate("Singers", 425 []string{"SingerId", "FirstName", "LastName"}, 426 w.row) 427 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 428 t.Fatal(err) 429 } 430 } 431 432 // For testing timestamp bound staleness. 433 <-time.After(time.Second) 434 435 // Test reading rows with different timestamp bounds. 436 for i, test := range []struct { 437 want [][]interface{} 438 tb TimestampBound 439 checkTs func(time.Time) error 440 }{ 441 // Note: min_read_timestamp and max_staleness are not supported by 442 // ReadOnlyTransaction. See API document for more details. 443 { 444 // strong 445 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 446 StrongRead(), 447 func(ts time.Time) error { 448 if ts.Before(writes[3].ts) { 449 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 450 } 451 return nil 452 }, 453 }, 454 { 455 // read_timestamp 456 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 457 ReadTimestamp(writes[2].ts), 458 func(ts time.Time) error { 459 if ts != writes[2].ts { 460 return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts) 461 } 462 return nil 463 }, 464 }, 465 { 466 // exact_staleness 467 nil, 468 // Specify a staleness which should be already before this test 469 // because context timeout is set to be 10s. 470 ExactStaleness(11 * time.Second), 471 func(ts time.Time) error { 472 if ts.After(writes[0].ts) { 473 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts) 474 } 475 return nil 476 }, 477 }, 478 } { 479 // ReadOnlyTransaction.Query 480 ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb) 481 got, err := readAll(ro.Query( 482 ctx, 483 Statement{ 484 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)", 485 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 486 })) 487 if err != nil { 488 t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err) 489 } 490 if !testEqual(got, test.want) { 491 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want) 492 } 493 rts, err := ro.Timestamp() 494 if err != nil { 495 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err) 496 } 497 if err := test.checkTs(rts); err != nil { 498 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err) 499 } 500 roTs := rts 501 // ReadOnlyTransaction.Read 502 got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 503 if err != nil { 504 t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err) 505 } 506 if !testEqual(got, test.want) { 507 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want) 508 } 509 rts, err = ro.Timestamp() 510 if err != nil { 511 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err) 512 } 513 if err := test.checkTs(rts); err != nil { 514 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err) 515 } 516 if roTs != rts { 517 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 518 } 519 // ReadOnlyTransaction.ReadRow 520 got = nil 521 for _, k := range []Key{{1}, {3}, {4}} { 522 r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 523 if err != nil { 524 continue 525 } 526 v, err := rowToValues(r) 527 if err != nil { 528 continue 529 } 530 got = append(got, v) 531 rts, err = ro.Timestamp() 532 if err != nil { 533 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 534 } 535 if err := test.checkTs(rts); err != nil { 536 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 537 } 538 if roTs != rts { 539 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 540 } 541 } 542 if !testEqual(got, test.want) { 543 t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want) 544 } 545 // SingleUse.ReadUsingIndex 546 got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 547 if err != nil { 548 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err) 549 } 550 // The results from ReadUsingIndex is sorted by the index rather than 551 // primary key. 552 if len(got) != len(test.want) { 553 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 554 } 555 for j, g := range got { 556 if j > 0 { 557 prev := got[j-1][1].(string) + got[j-1][2].(string) 558 curr := got[j][1].(string) + got[j][2].(string) 559 if strings.Compare(prev, curr) > 0 { 560 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 561 } 562 } 563 found := false 564 for _, w := range test.want { 565 if testEqual(g, w) { 566 found = true 567 } 568 } 569 if !found { 570 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 571 break 572 } 573 } 574 rts, err = ro.Timestamp() 575 if err != nil { 576 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 577 } 578 if err := test.checkTs(rts); err != nil { 579 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 580 } 581 if roTs != rts { 582 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 583 } 584 ro.Close() 585 } 586} 587 588// Test ReadOnlyTransaction with different timestamp bound when there's an 589// update at the same time. 590func TestIntegration_UpdateDuringRead(t *testing.T) { 591 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 592 defer cancel() 593 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 594 defer cleanup() 595 596 for i, tb := range []TimestampBound{ 597 StrongRead(), 598 ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour 599 ExactStaleness(time.Minute * 30), 600 } { 601 ro := client.ReadOnlyTransaction().WithTimestampBound(tb) 602 _, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 603 if ErrCode(err) != codes.NotFound { 604 t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err) 605 } 606 607 m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i}) 608 if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 609 t.Fatal(err) 610 } 611 612 _, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 613 if ErrCode(err) != codes.NotFound { 614 t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err) 615 } 616 } 617} 618 619// Test ReadWriteTransaction. 620func TestIntegration_ReadWriteTransaction(t *testing.T) { 621 // Give a longer deadline because of transaction backoffs. 622 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) 623 defer cancel() 624 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 625 defer cleanup() 626 627 // Set up two accounts 628 accounts := []*Mutation{ 629 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 630 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 631 } 632 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 633 t.Fatal(err) 634 } 635 wg := sync.WaitGroup{} 636 637 readBalance := func(iter *RowIterator) (int64, error) { 638 defer iter.Stop() 639 var bal int64 640 for { 641 row, err := iter.Next() 642 if err == iterator.Done { 643 return bal, nil 644 } 645 if err != nil { 646 return 0, err 647 } 648 if err := row.Column(0, &bal); err != nil { 649 return 0, err 650 } 651 } 652 } 653 654 for i := 0; i < 20; i++ { 655 wg.Add(1) 656 go func(iter int) { 657 defer wg.Done() 658 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 659 // Query Foo's balance and Bar's balance. 660 bf, e := readBalance(tx.Query(ctx, 661 Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}})) 662 if e != nil { 663 return e 664 } 665 bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"})) 666 if e != nil { 667 return e 668 } 669 if bf <= 0 { 670 return nil 671 } 672 bf-- 673 bb++ 674 return tx.BufferWrite([]*Mutation{ 675 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}), 676 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}), 677 }) 678 }) 679 if err != nil { 680 t.Errorf("%d: failed to execute transaction: %v", iter, err) 681 } 682 }(i) 683 } 684 // Because of context timeout, all goroutines will eventually return. 685 wg.Wait() 686 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 687 var bf, bb int64 688 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"}) 689 if e != nil { 690 return e 691 } 692 if ce := r.Column(0, &bf); ce != nil { 693 return ce 694 } 695 bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"})) 696 if e != nil { 697 return e 698 } 699 if bf != 30 || bb != 21 { 700 t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21) 701 } 702 return nil 703 }) 704 if err != nil { 705 t.Errorf("failed to check balances: %v", err) 706 } 707} 708 709func TestIntegration_Reads(t *testing.T) { 710 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 711 defer cancel() 712 // Set up testing environment. 713 client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements) 714 defer cleanup() 715 716 // Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2". 717 var ms []*Mutation 718 for i := 0; i < 15; i++ { 719 ms = append(ms, InsertOrUpdate(testTable, 720 testTableColumns, 721 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 722 } 723 // Don't use ApplyAtLeastOnce, so we can test the other code path. 724 if _, err := client.Apply(ctx, ms); err != nil { 725 t.Fatal(err) 726 } 727 728 // Empty read. 729 rows, err := readAllTestTable(client.Single().Read(ctx, testTable, 730 KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns)) 731 if err != nil { 732 t.Fatal(err) 733 } 734 if got, want := len(rows), 0; got != want { 735 t.Errorf("got %d, want %d", got, want) 736 } 737 738 // Index empty read. 739 rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, 740 KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns)) 741 if err != nil { 742 t.Fatal(err) 743 } 744 if got, want := len(rows), 0; got != want { 745 t.Errorf("got %d, want %d", got, want) 746 } 747 748 // Point read. 749 row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns) 750 if err != nil { 751 t.Fatal(err) 752 } 753 var got testTableRow 754 if err := row.ToStruct(&got); err != nil { 755 t.Fatal(err) 756 } 757 if want := (testTableRow{"k1", "v1"}); got != want { 758 t.Errorf("got %v, want %v", got, want) 759 } 760 761 // Point read not found. 762 _, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns) 763 if ErrCode(err) != codes.NotFound { 764 t.Fatalf("got %v, want NotFound", err) 765 } 766 767 // No index point read not found, because Go does not have ReadRowUsingIndex. 768 769 rangeReads(ctx, t, client) 770 indexRangeReads(ctx, t, client) 771} 772 773func TestIntegration_EarlyTimestamp(t *testing.T) { 774 // Test that we can get the timestamp from a read-only transaction as 775 // soon as we have read at least one row. 776 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) 777 defer cancel() 778 // Set up testing environment. 779 client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements) 780 defer cleanup() 781 782 var ms []*Mutation 783 for i := 0; i < 3; i++ { 784 ms = append(ms, InsertOrUpdate(testTable, 785 testTableColumns, 786 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 787 } 788 if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil { 789 t.Fatal(err) 790 } 791 792 txn := client.Single() 793 iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns) 794 defer iter.Stop() 795 // In single-use transaction, we should get an error before reading anything. 796 if _, err := txn.Timestamp(); err == nil { 797 t.Error("wanted error, got nil") 798 } 799 // After reading one row, the timestamp should be available. 800 _, err := iter.Next() 801 if err != nil { 802 t.Fatal(err) 803 } 804 if _, err := txn.Timestamp(); err != nil { 805 t.Errorf("got %v, want nil", err) 806 } 807 808 txn = client.ReadOnlyTransaction() 809 defer txn.Close() 810 iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns) 811 defer iter.Stop() 812 // In an ordinary read-only transaction, the timestamp should be 813 // available immediately. 814 if _, err := txn.Timestamp(); err != nil { 815 t.Errorf("got %v, want nil", err) 816 } 817} 818 819func TestIntegration_NestedTransaction(t *testing.T) { 820 // You cannot use a transaction from inside a read-write transaction. 821 ctx := context.Background() 822 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 823 defer cleanup() 824 825 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 826 _, err := client.ReadWriteTransaction(ctx, 827 func(context.Context, *ReadWriteTransaction) error { return nil }) 828 if ErrCode(err) != codes.FailedPrecondition { 829 t.Fatalf("got %v, want FailedPrecondition", err) 830 } 831 _, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 832 if ErrCode(err) != codes.FailedPrecondition { 833 t.Fatalf("got %v, want FailedPrecondition", err) 834 } 835 rot := client.ReadOnlyTransaction() 836 defer rot.Close() 837 _, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 838 if ErrCode(err) != codes.FailedPrecondition { 839 t.Fatalf("got %v, want FailedPrecondition", err) 840 } 841 return nil 842 }) 843 if err != nil { 844 t.Fatal(err) 845 } 846} 847 848// Test client recovery on database recreation. 849func TestIntegration_DbRemovalRecovery(t *testing.T) { 850 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) 851 defer cancel() 852 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 853 defer cleanup() 854 855 // Drop the testing database. 856 if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil { 857 t.Fatalf("failed to drop testing database %v: %v", dbPath, err) 858 } 859 860 // Now, send the query. 861 iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 862 defer iter.Stop() 863 if _, err := iter.Next(); err == nil { 864 t.Errorf("client sends query to removed database successfully, want it to fail") 865 } 866 867 // Recreate database and table. 868 dbName := dbPath[strings.LastIndex(dbPath, "/")+1:] 869 op, err := admin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{ 870 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 871 CreateStatement: "CREATE DATABASE " + dbName, 872 ExtraStatements: []string{ 873 `CREATE TABLE Singers ( 874 SingerId INT64 NOT NULL, 875 FirstName STRING(1024), 876 LastName STRING(1024), 877 SingerInfo BYTES(MAX) 878 ) PRIMARY KEY (SingerId)`, 879 }, 880 }) 881 if err != nil { 882 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 883 } 884 if _, err := op.Wait(ctx); err != nil { 885 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 886 } 887 888 // Now, send the query again. 889 iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 890 defer iter.Stop() 891 _, err = iter.Next() 892 if err != nil && err != iterator.Done { 893 t.Errorf("failed to send query to database %v: %v", dbPath, err) 894 } 895} 896 897// Test encoding/decoding non-struct Cloud Spanner types. 898func TestIntegration_BasicTypes(t *testing.T) { 899 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 900 defer cancel() 901 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 902 defer cleanup() 903 904 t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z") 905 // Boundaries 906 t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z") 907 t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z") 908 d1, _ := civil.ParseDate("2016-11-15") 909 // Boundaries 910 d2, _ := civil.ParseDate("0001-01-01") 911 d3, _ := civil.ParseDate("9999-12-31") 912 913 tests := []struct { 914 col string 915 val interface{} 916 want interface{} 917 }{ 918 {col: "String", val: ""}, 919 {col: "String", val: "", want: NullString{"", true}}, 920 {col: "String", val: "foo"}, 921 {col: "String", val: "foo", want: NullString{"foo", true}}, 922 {col: "String", val: NullString{"bar", true}, want: "bar"}, 923 {col: "String", val: NullString{"bar", false}, want: NullString{"", false}}, 924 {col: "String", val: nil, want: NullString{}}, 925 {col: "StringArray", val: []string(nil), want: []NullString(nil)}, 926 {col: "StringArray", val: []string{}, want: []NullString{}}, 927 {col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}}, 928 {col: "StringArray", val: []NullString(nil)}, 929 {col: "StringArray", val: []NullString{}}, 930 {col: "StringArray", val: []NullString{{"foo", true}, {}}}, 931 {col: "Bytes", val: []byte{}}, 932 {col: "Bytes", val: []byte{1, 2, 3}}, 933 {col: "Bytes", val: []byte(nil)}, 934 {col: "BytesArray", val: [][]byte(nil)}, 935 {col: "BytesArray", val: [][]byte{}}, 936 {col: "BytesArray", val: [][]byte{{1}, {2, 3}}}, 937 {col: "Int64a", val: 0, want: int64(0)}, 938 {col: "Int64a", val: -1, want: int64(-1)}, 939 {col: "Int64a", val: 2, want: int64(2)}, 940 {col: "Int64a", val: int64(3)}, 941 {col: "Int64a", val: 4, want: NullInt64{4, true}}, 942 {col: "Int64a", val: NullInt64{5, true}, want: int64(5)}, 943 {col: "Int64a", val: NullInt64{6, true}, want: int64(6)}, 944 {col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}}, 945 {col: "Int64a", val: nil, want: NullInt64{}}, 946 {col: "Int64Array", val: []int(nil), want: []NullInt64(nil)}, 947 {col: "Int64Array", val: []int{}, want: []NullInt64{}}, 948 {col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 949 {col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)}, 950 {col: "Int64Array", val: []int64{}, want: []NullInt64{}}, 951 {col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 952 {col: "Int64Array", val: []NullInt64(nil)}, 953 {col: "Int64Array", val: []NullInt64{}}, 954 {col: "Int64Array", val: []NullInt64{{1, true}, {}}}, 955 {col: "Bool", val: false}, 956 {col: "Bool", val: true}, 957 {col: "Bool", val: false, want: NullBool{false, true}}, 958 {col: "Bool", val: true, want: NullBool{true, true}}, 959 {col: "Bool", val: NullBool{true, true}}, 960 {col: "Bool", val: NullBool{false, false}}, 961 {col: "Bool", val: nil, want: NullBool{}}, 962 {col: "BoolArray", val: []bool(nil), want: []NullBool(nil)}, 963 {col: "BoolArray", val: []bool{}, want: []NullBool{}}, 964 {col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}}, 965 {col: "BoolArray", val: []NullBool(nil)}, 966 {col: "BoolArray", val: []NullBool{}}, 967 {col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}}, 968 {col: "Float64", val: 0.0}, 969 {col: "Float64", val: 3.14}, 970 {col: "Float64", val: math.NaN()}, 971 {col: "Float64", val: math.Inf(1)}, 972 {col: "Float64", val: math.Inf(-1)}, 973 {col: "Float64", val: 2.78, want: NullFloat64{2.78, true}}, 974 {col: "Float64", val: NullFloat64{2.71, true}, want: 2.71}, 975 {col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}}, 976 {col: "Float64", val: NullFloat64{0, false}}, 977 {col: "Float64", val: nil, want: NullFloat64{}}, 978 {col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)}, 979 {col: "Float64Array", val: []float64{}, want: []NullFloat64{}}, 980 {col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}}, 981 {col: "Float64Array", val: []NullFloat64(nil)}, 982 {col: "Float64Array", val: []NullFloat64{}}, 983 {col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}}, 984 {col: "Date", val: d1}, 985 {col: "Date", val: d1, want: NullDate{d1, true}}, 986 {col: "Date", val: NullDate{d1, true}}, 987 {col: "Date", val: NullDate{d1, true}, want: d1}, 988 {col: "Date", val: NullDate{civil.Date{}, false}}, 989 {col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)}, 990 {col: "DateArray", val: []civil.Date{}, want: []NullDate{}}, 991 {col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}}, 992 {col: "Timestamp", val: t1}, 993 {col: "Timestamp", val: t1, want: NullTime{t1, true}}, 994 {col: "Timestamp", val: NullTime{t1, true}}, 995 {col: "Timestamp", val: NullTime{t1, true}, want: t1}, 996 {col: "Timestamp", val: NullTime{}}, 997 {col: "Timestamp", val: nil, want: NullTime{}}, 998 {col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)}, 999 {col: "TimestampArray", val: []time.Time{}, want: []NullTime{}}, 1000 {col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}}, 1001 } 1002 1003 // Write rows into table first. 1004 var muts []*Mutation 1005 for i, test := range tests { 1006 muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val})) 1007 } 1008 if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil { 1009 t.Fatal(err) 1010 } 1011 1012 for i, test := range tests { 1013 row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col}) 1014 if err != nil { 1015 t.Fatalf("Unable to fetch row %v: %v", i, err) 1016 } 1017 // Create new instance of type of test.want. 1018 want := test.want 1019 if want == nil { 1020 want = test.val 1021 } 1022 gotp := reflect.New(reflect.TypeOf(want)) 1023 if err := row.Column(0, gotp.Interface()); err != nil { 1024 t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err) 1025 continue 1026 } 1027 got := reflect.Indirect(gotp).Interface() 1028 1029 // One of the test cases is checking NaN handling. Given 1030 // NaN!=NaN, we can't use reflect to test for it. 1031 if isNaN(got) && isNaN(want) { 1032 continue 1033 } 1034 1035 // Check non-NaN cases. 1036 if !testEqual(got, want) { 1037 t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want) 1038 continue 1039 } 1040 } 1041} 1042 1043// Test decoding Cloud Spanner STRUCT type. 1044func TestIntegration_StructTypes(t *testing.T) { 1045 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1046 defer cancel() 1047 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 1048 defer cleanup() 1049 1050 tests := []struct { 1051 q Statement 1052 want func(r *Row) error 1053 }{ 1054 { 1055 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`}, 1056 want: func(r *Row) error { 1057 // Test STRUCT ARRAY decoding to []NullRow. 1058 var rows []NullRow 1059 if err := r.Column(0, &rows); err != nil { 1060 return err 1061 } 1062 if len(rows) != 1 { 1063 return fmt.Errorf("len(rows) = %d; want 1", len(rows)) 1064 } 1065 if !rows[0].Valid { 1066 return fmt.Errorf("rows[0] is NULL") 1067 } 1068 var i, j int64 1069 if err := rows[0].Row.Columns(&i, &j); err != nil { 1070 return err 1071 } 1072 if i != 1 || j != 2 { 1073 return fmt.Errorf("got (%d,%d), want (1,2)", i, j) 1074 } 1075 return nil 1076 }, 1077 }, 1078 { 1079 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`}, 1080 want: func(r *Row) error { 1081 // Test Row.ToStruct. 1082 s := struct { 1083 Col1 []*struct { 1084 Foo int64 `spanner:"foo"` 1085 Bar int64 `spanner:"bar"` 1086 } `spanner:"col1"` 1087 }{} 1088 if err := r.ToStruct(&s); err != nil { 1089 return err 1090 } 1091 want := struct { 1092 Col1 []*struct { 1093 Foo int64 `spanner:"foo"` 1094 Bar int64 `spanner:"bar"` 1095 } `spanner:"col1"` 1096 }{ 1097 Col1: []*struct { 1098 Foo int64 `spanner:"foo"` 1099 Bar int64 `spanner:"bar"` 1100 }{ 1101 { 1102 Foo: 1, 1103 Bar: 2, 1104 }, 1105 }, 1106 } 1107 if !testEqual(want, s) { 1108 return fmt.Errorf("unexpected decoding result: %v, want %v", s, want) 1109 } 1110 return nil 1111 }, 1112 }, 1113 } 1114 for i, test := range tests { 1115 iter := client.Single().Query(ctx, test.q) 1116 defer iter.Stop() 1117 row, err := iter.Next() 1118 if err != nil { 1119 t.Errorf("%d: %v", i, err) 1120 continue 1121 } 1122 if err := test.want(row); err != nil { 1123 t.Errorf("%d: %v", i, err) 1124 continue 1125 } 1126 } 1127} 1128 1129func TestIntegration_StructParametersUnsupported(t *testing.T) { 1130 ctx := context.Background() 1131 client, _, cleanup := prepareIntegrationTest(ctx, t, nil) 1132 defer cleanup() 1133 1134 for _, test := range []struct { 1135 param interface{} 1136 wantCode codes.Code 1137 wantMsgPart string 1138 }{ 1139 { 1140 struct { 1141 Field int 1142 }{10}, 1143 codes.Unimplemented, 1144 "Unsupported query shape: " + 1145 "A struct value cannot be returned as a column value. " + 1146 "Rewrite the query to flatten the struct fields in the result.", 1147 }, 1148 { 1149 []struct { 1150 Field int 1151 }{{10}, {20}}, 1152 codes.Unimplemented, 1153 "Unsupported query shape: " + 1154 "This query can return a null-valued array of struct, " + 1155 "which is not supported by Spanner.", 1156 }, 1157 } { 1158 iter := client.Single().Query(ctx, Statement{ 1159 SQL: "SELECT @p", 1160 Params: map[string]interface{}{"p": test.param}, 1161 }) 1162 _, err := iter.Next() 1163 iter.Stop() 1164 if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok { 1165 t.Fatal(msg) 1166 } 1167 } 1168} 1169 1170// Test queries of the form "SELECT expr". 1171func TestIntegration_QueryExpressions(t *testing.T) { 1172 ctx := context.Background() 1173 client, _, cleanup := prepareIntegrationTest(ctx, t, nil) 1174 defer cleanup() 1175 1176 newRow := func(vals []interface{}) *Row { 1177 row, err := NewRow(make([]string, len(vals)), vals) 1178 if err != nil { 1179 t.Fatal(err) 1180 } 1181 return row 1182 } 1183 1184 tests := []struct { 1185 expr string 1186 want interface{} 1187 }{ 1188 {"1", int64(1)}, 1189 {"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}}, 1190 {"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}}, 1191 {"IEEE_DIVIDE(1, 0)", math.Inf(1)}, 1192 {"IEEE_DIVIDE(-1, 0)", math.Inf(-1)}, 1193 {"IEEE_DIVIDE(0, 0)", math.NaN()}, 1194 // TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate. 1195 {"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}}, 1196 {"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}}, 1197 {"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}}, 1198 } 1199 for _, test := range tests { 1200 iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr}) 1201 defer iter.Stop() 1202 row, err := iter.Next() 1203 if err != nil { 1204 t.Errorf("%q: %v", test.expr, err) 1205 continue 1206 } 1207 // Create new instance of type of test.want. 1208 gotp := reflect.New(reflect.TypeOf(test.want)) 1209 if err := row.Column(0, gotp.Interface()); err != nil { 1210 t.Errorf("%q: Column returned error %v", test.expr, err) 1211 continue 1212 } 1213 got := reflect.Indirect(gotp).Interface() 1214 // TODO(jba): remove isNaN special case when we have a better equality predicate. 1215 if isNaN(got) && isNaN(test.want) { 1216 continue 1217 } 1218 if !testEqual(got, test.want) { 1219 t.Errorf("%q\n got %#v\nwant %#v", test.expr, got, test.want) 1220 } 1221 } 1222} 1223 1224func TestIntegration_QueryStats(t *testing.T) { 1225 ctx := context.Background() 1226 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 1227 defer cleanup() 1228 1229 accounts := []*Mutation{ 1230 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1231 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1232 } 1233 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1234 t.Fatal(err) 1235 } 1236 const sql = "SELECT Balance FROM Accounts" 1237 1238 qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil}) 1239 if err != nil { 1240 t.Fatal(err) 1241 } 1242 if len(qp.PlanNodes) == 0 { 1243 t.Error("got zero plan nodes, expected at least one") 1244 } 1245 1246 iter := client.Single().QueryWithStats(ctx, Statement{sql, nil}) 1247 defer iter.Stop() 1248 for { 1249 _, err := iter.Next() 1250 if err == iterator.Done { 1251 break 1252 } 1253 if err != nil { 1254 t.Fatal(err) 1255 } 1256 } 1257 if iter.QueryPlan == nil { 1258 t.Error("got nil QueryPlan, expected one") 1259 } 1260 if iter.QueryStats == nil { 1261 t.Error("got nil QueryStats, expected some") 1262 } 1263} 1264 1265func TestIntegration_InvalidDatabase(t *testing.T) { 1266 if testProjectID == "" { 1267 t.Skip("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing") 1268 } 1269 ctx := context.Background() 1270 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID) 1271 c, err := createClient(ctx, dbPath) 1272 // Client creation should succeed even if the database is invalid. 1273 if err != nil { 1274 t.Fatal(err) 1275 } 1276 _, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"}) 1277 if msg, ok := matchError(err, codes.NotFound, ""); !ok { 1278 t.Fatal(msg) 1279 } 1280} 1281 1282func TestIntegration_ReadErrors(t *testing.T) { 1283 ctx := context.Background() 1284 client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements) 1285 defer cleanup() 1286 1287 // Read over invalid table fails 1288 _, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"}) 1289 if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok { 1290 t.Error(msg) 1291 } 1292 // Read over invalid column fails 1293 _, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"}) 1294 if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok { 1295 t.Error(msg) 1296 } 1297 1298 // Invalid query fails 1299 iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"}) 1300 defer iter.Stop() 1301 _, err = iter.Next() 1302 if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok { 1303 t.Error(msg) 1304 } 1305 1306 // Read should fail on cancellation. 1307 cctx, cancel := context.WithCancel(ctx) 1308 cancel() 1309 _, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"}) 1310 if msg, ok := matchError(err, codes.Canceled, ""); !ok { 1311 t.Error(msg) 1312 } 1313 // Read should fail if deadline exceeded. 1314 dctx, cancel := context.WithTimeout(ctx, time.Nanosecond) 1315 defer cancel() 1316 <-dctx.Done() 1317 _, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"}) 1318 if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok { 1319 t.Error(msg) 1320 } 1321} 1322 1323// Test TransactionRunner. Test that transactions are aborted and retried as 1324// expected. 1325func TestIntegration_TransactionRunner(t *testing.T) { 1326 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1327 defer cancel() 1328 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 1329 defer cleanup() 1330 1331 // Test 1: User error should abort the transaction. 1332 _, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1333 tx.BufferWrite([]*Mutation{ 1334 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})}) 1335 return errors.New("user error") 1336 }) 1337 // Empty read. 1338 rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"})) 1339 if err != nil { 1340 t.Fatal(err) 1341 } 1342 if got, want := len(rows), 0; got != want { 1343 t.Errorf("Empty read, got %d, want %d.", got, want) 1344 } 1345 1346 // Test 2: Expect abort and retry. 1347 // We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by 1348 // committing writes to the column txn2 have read, and expect the following 1349 // read to abort and txn2 retries. 1350 1351 // Set up two accounts 1352 accounts := []*Mutation{ 1353 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}), 1354 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}), 1355 } 1356 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1357 t.Fatal(err) 1358 } 1359 1360 var ( 1361 cTxn1Start = make(chan struct{}) 1362 cTxn1Commit = make(chan struct{}) 1363 cTxn2Start = make(chan struct{}) 1364 wg sync.WaitGroup 1365 ) 1366 1367 // read balance, check error if we don't expect abort. 1368 readBalance := func(tx interface { 1369 ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) 1370 }, key int64, expectAbort bool) (int64, error) { 1371 var b int64 1372 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"}) 1373 if e != nil { 1374 if expectAbort && !isAbortErr(e) { 1375 t.Errorf("ReadRow got %v, want Abort error.", e) 1376 } 1377 return b, e 1378 } 1379 if ce := r.Column(0, &b); ce != nil { 1380 return b, ce 1381 } 1382 return b, nil 1383 } 1384 1385 wg.Add(2) 1386 // Txn 1 1387 go func() { 1388 defer wg.Done() 1389 var once sync.Once 1390 _, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1391 b, e := readBalance(tx, 1, false) 1392 if e != nil { 1393 return e 1394 } 1395 // txn 1 can abort, in that case we skip closing the channel on 1396 // retry. 1397 once.Do(func() { close(cTxn1Start) }) 1398 e = tx.BufferWrite([]*Mutation{ 1399 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})}) 1400 if e != nil { 1401 return e 1402 } 1403 // Wait for second transaction. 1404 <-cTxn2Start 1405 return nil 1406 }) 1407 close(cTxn1Commit) 1408 if e != nil { 1409 t.Errorf("Transaction 1 commit, got %v, want nil.", e) 1410 } 1411 }() 1412 // Txn 2 1413 go func() { 1414 // Wait until txn 1 starts. 1415 <-cTxn1Start 1416 defer wg.Done() 1417 var ( 1418 once sync.Once 1419 b1 int64 1420 b2 int64 1421 e error 1422 ) 1423 _, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1424 if b1, e = readBalance(tx, 1, false); e != nil { 1425 return e 1426 } 1427 // Skip closing channel on retry. 1428 once.Do(func() { close(cTxn2Start) }) 1429 // Wait until txn 1 successfully commits. 1430 <-cTxn1Commit 1431 // Txn1 has committed and written a balance to the account. Now this 1432 // transaction (txn2) reads and re-writes the balance. The first 1433 // time through, it will abort because it overlaps with txn1. Then 1434 // it will retry after txn1 commits, and succeed. 1435 if b2, e = readBalance(tx, 2, true); e != nil { 1436 return e 1437 } 1438 return tx.BufferWrite([]*Mutation{ 1439 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})}) 1440 }) 1441 if e != nil { 1442 t.Errorf("Transaction 2 commit, got %v, want nil.", e) 1443 } 1444 }() 1445 wg.Wait() 1446 // Check that both transactions' effects are visible. 1447 for i := int64(1); i <= int64(2); i++ { 1448 if b, e := readBalance(client.Single(), i, false); e != nil { 1449 t.Fatalf("ReadBalance for key %d error %v.", i, e) 1450 } else if b != i { 1451 t.Errorf("Balance for key %d, got %d, want %d.", i, b, i) 1452 } 1453 } 1454} 1455 1456// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then 1457// serialize and deserialize both transaction and partition to be used in 1458// execution on another client, and compare results. 1459func TestIntegration_BatchQuery(t *testing.T) { 1460 // Set up testing environment. 1461 var ( 1462 client2 *Client 1463 err error 1464 ) 1465 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1466 defer cancel() 1467 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements) 1468 defer cleanup() 1469 1470 if err = populate(ctx, client); err != nil { 1471 t.Fatal(err) 1472 } 1473 if client2, err = createClient(ctx, dbPath); err != nil { 1474 t.Fatal(err) 1475 } 1476 defer client2.Close() 1477 1478 // PartitionQuery 1479 var ( 1480 txn *BatchReadOnlyTransaction 1481 partitions []*Partition 1482 stmt = Statement{SQL: "SELECT * FROM test;"} 1483 ) 1484 1485 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1486 t.Fatal(err) 1487 } 1488 defer txn.Cleanup(ctx) 1489 if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil { 1490 t.Fatal(err) 1491 } 1492 1493 // Reconstruct BatchReadOnlyTransactionID and execute partitions 1494 var ( 1495 tid2 BatchReadOnlyTransactionID 1496 data []byte 1497 gotResult bool // if we get matching result from two separate txns 1498 ) 1499 if data, err = txn.ID.MarshalBinary(); err != nil { 1500 t.Fatalf("encoding failed %v", err) 1501 } 1502 if err = tid2.UnmarshalBinary(data); err != nil { 1503 t.Fatalf("decoding failed %v", err) 1504 } 1505 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 1506 1507 // Execute Partitions and compare results 1508 for i, p := range partitions { 1509 iter := txn.Execute(ctx, p) 1510 defer iter.Stop() 1511 p2 := serdesPartition(t, i, p) 1512 iter2 := txn2.Execute(ctx, &p2) 1513 defer iter2.Stop() 1514 1515 row1, err1 := iter.Next() 1516 row2, err2 := iter2.Next() 1517 if err1 != err2 { 1518 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 1519 continue 1520 } 1521 if !testEqual(row1, row2) { 1522 t.Fatalf("execution returned different values: %v, %v", row1, row2) 1523 continue 1524 } 1525 if row1 == nil { 1526 continue 1527 } 1528 var a, b string 1529 if err = row1.Columns(&a, &b); err != nil { 1530 t.Fatalf("failed to parse row %v", err) 1531 continue 1532 } 1533 if a == str1 && b == str2 { 1534 gotResult = true 1535 } 1536 } 1537 if !gotResult { 1538 t.Fatalf("execution didn't return expected values") 1539 } 1540} 1541 1542// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery 1543func TestIntegration_BatchRead(t *testing.T) { 1544 // Set up testing environment. 1545 var ( 1546 client2 *Client 1547 err error 1548 ) 1549 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1550 defer cancel() 1551 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements) 1552 defer cleanup() 1553 1554 if err = populate(ctx, client); err != nil { 1555 t.Fatal(err) 1556 } 1557 if client2, err = createClient(ctx, dbPath); err != nil { 1558 t.Fatal(err) 1559 } 1560 defer client2.Close() 1561 1562 // PartitionRead 1563 var ( 1564 txn *BatchReadOnlyTransaction 1565 partitions []*Partition 1566 ) 1567 1568 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1569 t.Fatal(err) 1570 } 1571 defer txn.Cleanup(ctx) 1572 if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 1573 t.Fatal(err) 1574 } 1575 1576 // Reconstruct BatchReadOnlyTransactionID and execute partitions. 1577 var ( 1578 tid2 BatchReadOnlyTransactionID 1579 data []byte 1580 gotResult bool // if we get matching result from two separate txns 1581 ) 1582 if data, err = txn.ID.MarshalBinary(); err != nil { 1583 t.Fatalf("encoding failed %v", err) 1584 } 1585 if err = tid2.UnmarshalBinary(data); err != nil { 1586 t.Fatalf("decoding failed %v", err) 1587 } 1588 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 1589 1590 // Execute Partitions and compare results. 1591 for i, p := range partitions { 1592 iter := txn.Execute(ctx, p) 1593 defer iter.Stop() 1594 p2 := serdesPartition(t, i, p) 1595 iter2 := txn2.Execute(ctx, &p2) 1596 defer iter2.Stop() 1597 1598 row1, err1 := iter.Next() 1599 row2, err2 := iter2.Next() 1600 if err1 != err2 { 1601 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 1602 continue 1603 } 1604 if !testEqual(row1, row2) { 1605 t.Fatalf("execution returned different values: %v, %v", row1, row2) 1606 continue 1607 } 1608 if row1 == nil { 1609 continue 1610 } 1611 var a, b string 1612 if err = row1.Columns(&a, &b); err != nil { 1613 t.Fatalf("failed to parse row %v", err) 1614 continue 1615 } 1616 if a == str1 && b == str2 { 1617 gotResult = true 1618 } 1619 } 1620 if !gotResult { 1621 t.Fatalf("execution didn't return expected values") 1622 } 1623} 1624 1625// Test normal txReadEnv method on BatchReadOnlyTransaction. 1626func TestIntegration_BROTNormal(t *testing.T) { 1627 // Set up testing environment and create txn. 1628 var ( 1629 txn *BatchReadOnlyTransaction 1630 err error 1631 row *Row 1632 i int64 1633 ) 1634 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1635 defer cancel() 1636 client, _, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements) 1637 defer cleanup() 1638 1639 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1640 t.Fatal(err) 1641 } 1642 defer txn.Cleanup(ctx) 1643 if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 1644 t.Fatal(err) 1645 } 1646 // Normal query should work with BatchReadOnlyTransaction. 1647 stmt2 := Statement{SQL: "SELECT 1"} 1648 iter := txn.Query(ctx, stmt2) 1649 defer iter.Stop() 1650 1651 row, err = iter.Next() 1652 if err != nil { 1653 t.Errorf("query failed with %v", err) 1654 } 1655 if err = row.Columns(&i); err != nil { 1656 t.Errorf("failed to parse row %v", err) 1657 } 1658} 1659 1660func TestIntegration_CommitTimestamp(t *testing.T) { 1661 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1662 defer cancel() 1663 client, _, cleanup := prepareIntegrationTest(ctx, t, ctsDBStatements) 1664 defer cleanup() 1665 1666 type testTableRow struct { 1667 Key string 1668 Ts NullTime 1669 } 1670 1671 var ( 1672 cts1, cts2, ts1, ts2 time.Time 1673 err error 1674 ) 1675 1676 // Apply mutation in sequence, expect to see commit timestamp in good order, 1677 // check also the commit timestamp returned 1678 for _, it := range []struct { 1679 k string 1680 t *time.Time 1681 }{ 1682 {"a", &cts1}, 1683 {"b", &cts2}, 1684 } { 1685 tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}} 1686 m, err := InsertStruct("TestTable", tt) 1687 if err != nil { 1688 t.Fatal(err) 1689 } 1690 *it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()) 1691 if err != nil { 1692 t.Fatal(err) 1693 } 1694 } 1695 1696 txn := client.ReadOnlyTransaction() 1697 for _, it := range []struct { 1698 k string 1699 t *time.Time 1700 }{ 1701 {"a", &ts1}, 1702 {"b", &ts2}, 1703 } { 1704 if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil { 1705 t.Fatal(err) 1706 } else { 1707 var got testTableRow 1708 if err := r.ToStruct(&got); err != nil { 1709 t.Fatal(err) 1710 } 1711 *it.t = got.Ts.Time 1712 } 1713 } 1714 if !cts1.Equal(ts1) { 1715 t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1) 1716 } 1717 if !cts2.Equal(ts2) { 1718 t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2) 1719 } 1720 1721 // Try writing a timestamp in the future to commit timestamp, expect error. 1722 _, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce()) 1723 if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok { 1724 t.Error(msg) 1725 } 1726} 1727 1728func TestIntegration_DML(t *testing.T) { 1729 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1730 defer cancel() 1731 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 1732 defer cleanup() 1733 1734 // Function that reads a single row's first name from within a transaction. 1735 readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) { 1736 row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"}) 1737 if err != nil { 1738 return "", err 1739 } 1740 var fn string 1741 if err := row.Column(0, &fn); err != nil { 1742 return "", err 1743 } 1744 return fn, nil 1745 } 1746 1747 // Function that reads multiple rows' first names from outside a read/write 1748 // transaction. 1749 readFirstNames := func(keys ...int) []string { 1750 var ks []KeySet 1751 for _, k := range keys { 1752 ks = append(ks, Key{k}) 1753 } 1754 iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"}) 1755 var got []string 1756 var fn string 1757 err := iter.Do(func(row *Row) error { 1758 if err := row.Column(0, &fn); err != nil { 1759 return err 1760 } 1761 got = append(got, fn) 1762 return nil 1763 }) 1764 if err != nil { 1765 t.Fatalf("readFirstNames(%v): %v", keys, err) 1766 } 1767 return got 1768 } 1769 1770 // Use ReadWriteTransaction.Query to execute a DML statement. 1771 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1772 iter := tx.Query(ctx, Statement{ 1773 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`, 1774 }) 1775 defer iter.Stop() 1776 if row, err := iter.Next(); err != iterator.Done { 1777 t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err) 1778 } 1779 if iter.RowCount != 1 { 1780 t.Errorf("row count: got %d, want 1", iter.RowCount) 1781 } 1782 // The results of the DML statement should be visible to the transaction. 1783 got, err := readFirstName(tx, 1) 1784 if err != nil { 1785 return err 1786 } 1787 if want := "Umm"; got != want { 1788 t.Errorf("got %q, want %q", got, want) 1789 } 1790 return nil 1791 }) 1792 if err != nil { 1793 t.Fatal(err) 1794 } 1795 1796 // Use ReadWriteTransaction.Update to execute a DML statement. 1797 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1798 count, err := tx.Update(ctx, Statement{ 1799 SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`, 1800 }) 1801 if err != nil { 1802 t.Fatal(err) 1803 } 1804 if count != 1 { 1805 t.Errorf("row count: got %d, want 1", count) 1806 } 1807 got, err := readFirstName(tx, 2) 1808 if err != nil { 1809 return err 1810 } 1811 if want := "Eduard"; got != want { 1812 t.Errorf("got %q, want %q", got, want) 1813 } 1814 return nil 1815 1816 }) 1817 if err != nil { 1818 t.Fatal(err) 1819 } 1820 1821 // Roll back a DML statement and confirm that it didn't happen. 1822 var fail = errors.New("fail") 1823 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1824 _, err := tx.Update(ctx, Statement{ 1825 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 1826 }) 1827 if err != nil { 1828 return err 1829 } 1830 return fail 1831 }) 1832 if err != fail { 1833 t.Fatalf("rolling back: got error %v, want the error 'fail'", err) 1834 } 1835 _, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"}) 1836 if got, want := ErrCode(err), codes.NotFound; got != want { 1837 t.Errorf("got %s, want %s", got, want) 1838 } 1839 1840 // Run two DML statements in the same transaction. 1841 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1842 _, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`}) 1843 if err != nil { 1844 return err 1845 } 1846 _, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`}) 1847 if err != nil { 1848 return err 1849 } 1850 return nil 1851 }) 1852 if err != nil { 1853 t.Fatal(err) 1854 } 1855 got := readFirstNames(1, 2) 1856 want := []string{"Oum", "Eddie"} 1857 if !testEqual(got, want) { 1858 t.Errorf("got %v, want %v", got, want) 1859 } 1860 1861 // Run a DML statement and an ordinary mutation in the same transaction. 1862 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1863 _, err := tx.Update(ctx, Statement{ 1864 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 1865 }) 1866 if err != nil { 1867 return err 1868 } 1869 return tx.BufferWrite([]*Mutation{ 1870 Insert("Singers", []string{"SingerId", "FirstName", "LastName"}, 1871 []interface{}{4, "Andy", "Irvine"}), 1872 }) 1873 }) 1874 if err != nil { 1875 t.Fatal(err) 1876 } 1877 got = readFirstNames(3, 4) 1878 want = []string{"Audra", "Andy"} 1879 if !testEqual(got, want) { 1880 t.Errorf("got %v, want %v", got, want) 1881 } 1882 1883 // Attempt to run a query using update. 1884 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1885 _, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`}) 1886 return err 1887 }) 1888 if got, want := ErrCode(err), codes.InvalidArgument; got != want { 1889 t.Errorf("got %s, want %s", got, want) 1890 } 1891} 1892 1893func TestIntegration_StructParametersBind(t *testing.T) { 1894 t.Parallel() 1895 ctx := context.Background() 1896 client, _, cleanup := prepareIntegrationTest(ctx, t, nil) 1897 defer cleanup() 1898 1899 type tRow []interface{} 1900 type tRows []struct{ trow tRow } 1901 1902 type allFields struct { 1903 Stringf string 1904 Intf int 1905 Boolf bool 1906 Floatf float64 1907 Bytef []byte 1908 Timef time.Time 1909 Datef civil.Date 1910 } 1911 allColumns := []string{ 1912 "Stringf", 1913 "Intf", 1914 "Boolf", 1915 "Floatf", 1916 "Bytef", 1917 "Timef", 1918 "Datef", 1919 } 1920 s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1} 1921 s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2} 1922 1923 dynamicStructType := reflect.StructOf([]reflect.StructField{ 1924 {Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`}, 1925 }) 1926 s3 := reflect.New(dynamicStructType) 1927 s3.Elem().Field(0).Set(reflect.ValueOf(t1)) 1928 1929 for i, test := range []struct { 1930 param interface{} 1931 sql string 1932 cols []string 1933 trows tRows 1934 }{ 1935 // Struct value. 1936 { 1937 s1, 1938 "SELECT" + 1939 " @p.Stringf," + 1940 " @p.Intf," + 1941 " @p.Boolf," + 1942 " @p.Floatf," + 1943 " @p.Bytef," + 1944 " @p.Timef," + 1945 " @p.Datef", 1946 allColumns, 1947 tRows{ 1948 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 1949 }, 1950 }, 1951 // Array of struct value. 1952 { 1953 []allFields{s1, s2}, 1954 "SELECT * FROM UNNEST(@p)", 1955 allColumns, 1956 tRows{ 1957 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 1958 {tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}}, 1959 }, 1960 }, 1961 // Null struct. 1962 { 1963 (*allFields)(nil), 1964 "SELECT @p IS NULL", 1965 []string{""}, 1966 tRows{ 1967 {tRow{true}}, 1968 }, 1969 }, 1970 // Null Array of struct. 1971 { 1972 []allFields(nil), 1973 "SELECT @p IS NULL", 1974 []string{""}, 1975 tRows{ 1976 {tRow{true}}, 1977 }, 1978 }, 1979 // Empty struct. 1980 { 1981 struct{}{}, 1982 "SELECT @p IS NULL ", 1983 []string{""}, 1984 tRows{ 1985 {tRow{false}}, 1986 }, 1987 }, 1988 // Empty array of struct. 1989 { 1990 []allFields{}, 1991 "SELECT * FROM UNNEST(@p) ", 1992 allColumns, 1993 tRows{}, 1994 }, 1995 // Struct with duplicate fields. 1996 { 1997 struct { 1998 A int `spanner:"field"` 1999 B int `spanner:"field"` 2000 }{10, 20}, 2001 "SELECT * FROM UNNEST([@p]) ", 2002 []string{"field", "field"}, 2003 tRows{ 2004 {tRow{10, 20}}, 2005 }, 2006 }, 2007 // Struct with unnamed fields. 2008 { 2009 struct { 2010 A string `spanner:""` 2011 }{"hello"}, 2012 "SELECT * FROM UNNEST([@p]) ", 2013 []string{""}, 2014 tRows{ 2015 {tRow{"hello"}}, 2016 }, 2017 }, 2018 // Mixed struct. 2019 { 2020 struct { 2021 DynamicStructField interface{} `spanner:"f1"` 2022 ArrayStructField []*allFields `spanner:"f2"` 2023 }{ 2024 DynamicStructField: s3.Interface(), 2025 ArrayStructField: []*allFields{nil}, 2026 }, 2027 "SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ", 2028 []string{"ff1", "", ""}, 2029 tRows{ 2030 {tRow{t1, 1, true}}, 2031 }, 2032 }, 2033 } { 2034 iter := client.Single().Query(ctx, Statement{ 2035 SQL: test.sql, 2036 Params: map[string]interface{}{"p": test.param}, 2037 }) 2038 var gotRows []*Row 2039 err := iter.Do(func(r *Row) error { 2040 gotRows = append(gotRows, r) 2041 return nil 2042 }) 2043 if err != nil { 2044 t.Errorf("Failed to execute test case %d, error: %v", i, err) 2045 } 2046 2047 var wantRows []*Row 2048 for j, row := range test.trows { 2049 r, err := NewRow(test.cols, row.trow) 2050 if err != nil { 2051 t.Errorf("Invalid row %d in test case %d", j, i) 2052 } 2053 wantRows = append(wantRows, r) 2054 } 2055 if !testEqual(gotRows, wantRows) { 2056 t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows) 2057 } 2058 } 2059} 2060 2061func TestIntegration_PDML(t *testing.T) { 2062 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2063 defer cancel() 2064 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 2065 defer cleanup() 2066 2067 columns := []string{"SingerId", "FirstName", "LastName"} 2068 2069 // Populate the Singers table. 2070 var muts []*Mutation 2071 for _, row := range [][]interface{}{ 2072 {1, "Umm", "Kulthum"}, 2073 {2, "Eduard", "Khil"}, 2074 {3, "Audra", "McDonald"}, 2075 } { 2076 muts = append(muts, Insert("Singers", columns, row)) 2077 } 2078 if _, err := client.Apply(ctx, muts); err != nil { 2079 t.Fatal(err) 2080 } 2081 // Identifiers in PDML statements must be fully qualified. 2082 // TODO(jba): revisit the above. 2083 count, err := client.PartitionedUpdate(ctx, Statement{ 2084 SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= 3`, 2085 }) 2086 if err != nil { 2087 t.Fatal(err) 2088 } 2089 if want := int64(3); count != want { 2090 t.Errorf("got %d, want %d", count, want) 2091 } 2092 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2093 if err != nil { 2094 t.Fatal(err) 2095 } 2096 want := [][]interface{}{ 2097 {int64(1), "changed", "Kulthum"}, 2098 {int64(2), "changed", "Khil"}, 2099 {int64(3), "changed", "McDonald"}, 2100 } 2101 if !testEqual(got, want) { 2102 t.Errorf("\ngot %v\nwant%v", got, want) 2103 } 2104} 2105 2106func TestBatchDML(t *testing.T) { 2107 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2108 defer cancel() 2109 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 2110 defer cleanup() 2111 2112 columns := []string{"SingerId", "FirstName", "LastName"} 2113 2114 // Populate the Singers table. 2115 var muts []*Mutation 2116 for _, row := range [][]interface{}{ 2117 {1, "Umm", "Kulthum"}, 2118 {2, "Eduard", "Khil"}, 2119 {3, "Audra", "McDonald"}, 2120 } { 2121 muts = append(muts, Insert("Singers", columns, row)) 2122 } 2123 if _, err := client.Apply(ctx, muts); err != nil { 2124 t.Fatal(err) 2125 } 2126 2127 var counts []int64 2128 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2129 counts, err = tx.BatchUpdate(ctx, []Statement{ 2130 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2131 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2132 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2133 }) 2134 return err 2135 }) 2136 2137 if err != nil { 2138 t.Fatal(err) 2139 } 2140 if want := []int64{1, 1, 1}; !testEqual(counts, want) { 2141 t.Fatalf("got %d, want %d", counts, want) 2142 } 2143 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2144 if err != nil { 2145 t.Fatal(err) 2146 } 2147 want := [][]interface{}{ 2148 {int64(1), "changed 1", "Kulthum"}, 2149 {int64(2), "changed 2", "Khil"}, 2150 {int64(3), "changed 3", "McDonald"}, 2151 } 2152 if !testEqual(got, want) { 2153 t.Errorf("\ngot %v\nwant%v", got, want) 2154 } 2155} 2156 2157func TestBatchDML_NoStatements(t *testing.T) { 2158 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2159 defer cancel() 2160 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 2161 defer cleanup() 2162 2163 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2164 _, err = tx.BatchUpdate(ctx, []Statement{}) 2165 return err 2166 }) 2167 if err == nil { 2168 t.Fatal("expected error, got nil") 2169 } 2170 if s, ok := status.FromError(err); ok { 2171 if s.Code() != codes.InvalidArgument { 2172 t.Fatalf("expected InvalidArgument, got %v", err) 2173 } 2174 } else { 2175 t.Fatalf("expected InvalidArgument, got %v", err) 2176 } 2177} 2178 2179func TestBatchDML_TwoStatements(t *testing.T) { 2180 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2181 defer cancel() 2182 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 2183 defer cleanup() 2184 2185 columns := []string{"SingerId", "FirstName", "LastName"} 2186 2187 // Populate the Singers table. 2188 var muts []*Mutation 2189 for _, row := range [][]interface{}{ 2190 {1, "Umm", "Kulthum"}, 2191 {2, "Eduard", "Khil"}, 2192 {3, "Audra", "McDonald"}, 2193 } { 2194 muts = append(muts, Insert("Singers", columns, row)) 2195 } 2196 if _, err := client.Apply(ctx, muts); err != nil { 2197 t.Fatal(err) 2198 } 2199 2200 var updateCount int64 2201 var batchCounts []int64 2202 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2203 batchCounts, err = tx.BatchUpdate(ctx, []Statement{ 2204 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2205 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2206 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2207 }) 2208 if err != nil { 2209 return err 2210 } 2211 2212 updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}) 2213 return err 2214 }) 2215 if err != nil { 2216 t.Fatal(err) 2217 } 2218 if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) { 2219 t.Fatalf("got %d, want %d", batchCounts, want) 2220 } 2221 if updateCount != 1 { 2222 t.Fatalf("got %v, want 1", updateCount) 2223 } 2224} 2225 2226// TODO(deklerk): this currently does not work because the transaction appears to 2227// get rolled back after a single statement fails. b/120158761 2228func TestBatchDML_Error(t *testing.T) { 2229 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2230 defer cancel() 2231 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 2232 defer cleanup() 2233 2234 columns := []string{"SingerId", "FirstName", "LastName"} 2235 2236 // Populate the Singers table. 2237 var muts []*Mutation 2238 for _, row := range [][]interface{}{ 2239 {1, "Umm", "Kulthum"}, 2240 {2, "Eduard", "Khil"}, 2241 {3, "Audra", "McDonald"}, 2242 } { 2243 muts = append(muts, Insert("Singers", columns, row)) 2244 } 2245 if _, err := client.Apply(ctx, muts); err != nil { 2246 t.Fatal(err) 2247 } 2248 2249 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2250 counts, err := tx.BatchUpdate(ctx, []Statement{ 2251 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2252 {SQL: `some illegal statement`}, 2253 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2254 }) 2255 if err == nil { 2256 t.Fatal("expected err, got nil") 2257 } 2258 if want := []int64{1}; !testEqual(counts, want) { 2259 t.Fatalf("got %d, want %d", counts, want) 2260 } 2261 2262 got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns)) 2263 if err != nil { 2264 t.Fatal(err) 2265 } 2266 want := [][]interface{}{ 2267 {int64(1), "changed 1", "Kulthum"}, 2268 {int64(2), "Eduard", "Khil"}, 2269 {int64(3), "Audra", "McDonald"}, 2270 } 2271 if !testEqual(got, want) { 2272 t.Errorf("\ngot %v\nwant%v", got, want) 2273 } 2274 2275 return nil 2276 }) 2277 if err != nil { 2278 t.Fatal(err) 2279 } 2280} 2281 2282// Prepare initializes Cloud Spanner testing DB and clients. 2283func prepareIntegrationTest(ctx context.Context, t *testing.T, statements []string) (*Client, string, func()) { 2284 if admin == nil { 2285 t.Skip("Integration tests skipped") 2286 } 2287 // Construct a unique test DB name. 2288 dbName := dbNameSpace.New() 2289 2290 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName) 2291 // Create database and tables. 2292 op, err := admin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{ 2293 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 2294 CreateStatement: "CREATE DATABASE " + dbName, 2295 ExtraStatements: statements, 2296 }) 2297 if err != nil { 2298 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 2299 } 2300 if _, err := op.Wait(ctx); err != nil { 2301 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 2302 } 2303 client, err := createClient(ctx, dbPath) 2304 if err != nil { 2305 t.Fatalf("cannot create data client on DB %v: %v", dbPath, err) 2306 } 2307 return client, dbPath, func() { 2308 client.Close() 2309 if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil { 2310 t.Logf("failed to drop database %s (error %v), might need a manual removal", 2311 dbPath, err) 2312 } 2313 } 2314} 2315 2316func cleanupDatabases() { 2317 if admin == nil { 2318 // Integration tests skipped. 2319 return 2320 } 2321 2322 ctx := context.Background() 2323 dbsParent := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID) 2324 dbsIter := admin.ListDatabases(ctx, &adminpb.ListDatabasesRequest{Parent: dbsParent}) 2325 expireAge := 24 * time.Hour 2326 2327 for { 2328 db, err := dbsIter.Next() 2329 if err == iterator.Done { 2330 break 2331 } 2332 if err != nil { 2333 panic(err) 2334 } 2335 // TODO(deklerk): When we have the ability to programmatically create 2336 // instances, we can create an instance with uid.New and delete all 2337 // tables in it. For now, we rely on matching prefixes. 2338 if dbNameSpace.Older(db.Name, expireAge) { 2339 log.Printf("Dropping database %s", db.Name) 2340 2341 if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: db.Name}); err != nil { 2342 log.Printf("failed to drop database %s (error %v), might need a manual removal", 2343 db.Name, err) 2344 } 2345 } 2346 } 2347} 2348 2349func rangeReads(ctx context.Context, t *testing.T, client *Client) { 2350 checkRange := func(ks KeySet, wantNums ...int) { 2351 if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok { 2352 t.Errorf("key set %+v: %s", ks, msg) 2353 } 2354 } 2355 2356 checkRange(Key{"k1"}, 1) 2357 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4) 2358 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5) 2359 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5) 2360 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4) 2361 2362 // Partial key specification. 2363 checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9) 2364 checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9) 2365 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10) 2366 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11) 2367 2368 // The following produce empty ranges. 2369 // TODO(jba): Consider a multi-part key to illustrate partial key behavior. 2370 // checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen}) 2371 // checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen}) 2372 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen}) 2373 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed}) 2374 2375 // Prefix is component-wise, not string prefix. 2376 checkRange(Key{"k1"}.AsPrefix(), 1) 2377 checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 2378 2379 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 2380} 2381 2382func indexRangeReads(ctx context.Context, t *testing.T, client *Client) { 2383 checkRange := func(ks KeySet, wantNums ...int) { 2384 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns), 2385 wantNums); !ok { 2386 t.Errorf("key set %+v: %s", ks, msg) 2387 } 2388 } 2389 2390 checkRange(Key{"v1"}, 1) 2391 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4) 2392 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5) 2393 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5) 2394 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4) 2395 2396 // // Partial key specification. 2397 checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9) 2398 checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9) 2399 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10) 2400 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11) 2401 2402 // // The following produce empty ranges. 2403 // checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen}) 2404 // checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen}) 2405 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen}) 2406 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed}) 2407 2408 // // Prefix is component-wise, not string prefix. 2409 checkRange(Key{"v1"}.AsPrefix(), 1) 2410 checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 2411 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 2412 2413 // Read from an index with DESC ordering. 2414 wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0} 2415 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns), 2416 wantNums); !ok { 2417 t.Errorf("desc: %s", msg) 2418 } 2419} 2420 2421type testTableRow struct{ Key, StringValue string } 2422 2423func compareRows(iter *RowIterator, wantNums []int) (string, bool) { 2424 rows, err := readAllTestTable(iter) 2425 if err != nil { 2426 return err.Error(), false 2427 } 2428 want := map[string]string{} 2429 for _, n := range wantNums { 2430 want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n) 2431 } 2432 got := map[string]string{} 2433 for _, r := range rows { 2434 got[r.Key] = r.StringValue 2435 } 2436 if !testEqual(got, want) { 2437 return fmt.Sprintf("got %v, want %v", got, want), false 2438 } 2439 return "", true 2440} 2441 2442func isNaN(x interface{}) bool { 2443 f, ok := x.(float64) 2444 if !ok { 2445 return false 2446 } 2447 return math.IsNaN(f) 2448} 2449 2450// createClient creates Cloud Spanner data client. 2451func createClient(ctx context.Context, dbPath string) (client *Client, err error) { 2452 client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{ 2453 SessionPoolConfig: SessionPoolConfig{WriteSessions: 0.2}, 2454 }, option.WithTokenSource(testutil.TokenSource(ctx, Scope)), option.WithEndpoint(endpoint)) 2455 if err != nil { 2456 return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err) 2457 } 2458 return client, nil 2459} 2460 2461// populate prepares the database with some data. 2462func populate(ctx context.Context, client *Client) error { 2463 // Populate data 2464 var err error 2465 m := InsertMap("test", map[string]interface{}{ 2466 "a": str1, 2467 "b": str2, 2468 }) 2469 _, err = client.Apply(ctx, []*Mutation{m}) 2470 return err 2471} 2472 2473func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) { 2474 if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) { 2475 return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false 2476 } 2477 return "", true 2478} 2479 2480func rowToValues(r *Row) ([]interface{}, error) { 2481 var x int64 2482 var y, z string 2483 if err := r.Column(0, &x); err != nil { 2484 return nil, err 2485 } 2486 if err := r.Column(1, &y); err != nil { 2487 return nil, err 2488 } 2489 if err := r.Column(2, &z); err != nil { 2490 return nil, err 2491 } 2492 return []interface{}{x, y, z}, nil 2493} 2494 2495func readAll(iter *RowIterator) ([][]interface{}, error) { 2496 defer iter.Stop() 2497 var vals [][]interface{} 2498 for { 2499 row, err := iter.Next() 2500 if err == iterator.Done { 2501 return vals, nil 2502 } 2503 if err != nil { 2504 return nil, err 2505 } 2506 v, err := rowToValues(row) 2507 if err != nil { 2508 return nil, err 2509 } 2510 vals = append(vals, v) 2511 } 2512} 2513 2514func readAllTestTable(iter *RowIterator) ([]testTableRow, error) { 2515 defer iter.Stop() 2516 var vals []testTableRow 2517 for { 2518 row, err := iter.Next() 2519 if err == iterator.Done { 2520 return vals, nil 2521 } 2522 if err != nil { 2523 return nil, err 2524 } 2525 var ttr testTableRow 2526 if err := row.ToStruct(&ttr); err != nil { 2527 return nil, err 2528 } 2529 vals = append(vals, ttr) 2530 } 2531} 2532