1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "errors" 22 "flag" 23 "fmt" 24 "log" 25 "math" 26 "os" 27 "reflect" 28 "strings" 29 "sync" 30 "testing" 31 "time" 32 33 "cloud.google.com/go/civil" 34 "cloud.google.com/go/internal/testutil" 35 "cloud.google.com/go/internal/uid" 36 database "cloud.google.com/go/spanner/admin/database/apiv1" 37 "google.golang.org/api/iterator" 38 "google.golang.org/api/option" 39 adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" 40 "google.golang.org/grpc/codes" 41 "google.golang.org/grpc/status" 42) 43 44var ( 45 // testProjectID specifies the project used for testing. 46 // It can be changed by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID. 47 testProjectID = testutil.ProjID() 48 49 dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true}) 50 51 // TODO(deklerk) When we can programmatically create instances, we should use 52 // uid.New as the test instance name. 53 // testInstanceID specifies the Cloud Spanner instance used for testing. 54 testInstanceID = "go-integration-test" 55 56 testTable = "TestTable" 57 testTableIndex = "TestTableByValue" 58 testTableColumns = []string{"Key", "StringValue"} 59 60 // admin is a spanner.DatabaseAdminClient. 61 admin *database.DatabaseAdminClient 62 63 singerDBStatements = []string{ 64 `CREATE TABLE Singers ( 65 SingerId INT64 NOT NULL, 66 FirstName STRING(1024), 67 LastName STRING(1024), 68 SingerInfo BYTES(MAX) 69 ) PRIMARY KEY (SingerId)`, 70 `CREATE INDEX SingerByName ON Singers(FirstName, LastName)`, 71 `CREATE TABLE Accounts ( 72 AccountId INT64 NOT NULL, 73 Nickname STRING(100), 74 Balance INT64 NOT NULL, 75 ) PRIMARY KEY (AccountId)`, 76 `CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`, 77 `CREATE TABLE Types ( 78 RowID INT64 NOT NULL, 79 String STRING(MAX), 80 StringArray ARRAY<STRING(MAX)>, 81 Bytes BYTES(MAX), 82 BytesArray ARRAY<BYTES(MAX)>, 83 Int64a INT64, 84 Int64Array ARRAY<INT64>, 85 Bool BOOL, 86 BoolArray ARRAY<BOOL>, 87 Float64 FLOAT64, 88 Float64Array ARRAY<FLOAT64>, 89 Date DATE, 90 DateArray ARRAY<DATE>, 91 Timestamp TIMESTAMP, 92 TimestampArray ARRAY<TIMESTAMP>, 93 ) PRIMARY KEY (RowID)`, 94 } 95 96 readDBStatements = []string{ 97 `CREATE TABLE TestTable ( 98 Key STRING(MAX) NOT NULL, 99 StringValue STRING(MAX) 100 ) PRIMARY KEY (Key)`, 101 `CREATE INDEX TestTableByValue ON TestTable(StringValue)`, 102 `CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`, 103 } 104 105 simpleDBStatements = []string{ 106 `CREATE TABLE test ( 107 a STRING(1024), 108 b STRING(1024), 109 ) PRIMARY KEY (a)`, 110 } 111 simpleDBTableColumns = []string{"a", "b"} 112 113 ctsDBStatements = []string{ 114 `CREATE TABLE TestTable ( 115 Key STRING(MAX) NOT NULL, 116 Ts TIMESTAMP OPTIONS (allow_commit_timestamp = true), 117 ) PRIMARY KEY (Key)`, 118 } 119) 120 121const ( 122 str1 = "alice" 123 str2 = "a@example.com" 124) 125 126func TestMain(m *testing.M) { 127 cleanup := initIntegrationTests() 128 res := m.Run() 129 cleanup() 130 os.Exit(res) 131} 132 133func initIntegrationTests() func() { 134 ctx := context.Background() 135 flag.Parse() // needed for testing.Short() 136 noop := func() {} 137 138 if testing.Short() { 139 log.Println("Integration tests skipped in -short mode.") 140 return noop 141 } 142 143 if testProjectID == "" { 144 log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing") 145 return noop 146 } 147 148 ts := testutil.TokenSource(ctx, AdminScope, Scope) 149 if ts == nil { 150 log.Printf("Integration test skipped: cannot get service account credential from environment variable %v", "GCLOUD_TESTS_GOLANG_KEY") 151 return noop 152 } 153 var err error 154 155 // Create Admin client and Data client. 156 admin, err = database.NewDatabaseAdminClient(ctx, option.WithTokenSource(ts), option.WithEndpoint(endpoint)) 157 if err != nil { 158 log.Fatalf("cannot create admin client: %v", err) 159 } 160 161 return func() { 162 cleanupDatabases() 163 admin.Close() 164 } 165} 166 167// Test SingleUse transaction. 168func TestIntegration_SingleUse(t *testing.T) { 169 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) 170 defer cancel() 171 // Set up testing environment. 172 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 173 defer cleanup() 174 175 writes := []struct { 176 row []interface{} 177 ts time.Time 178 }{ 179 {row: []interface{}{1, "Marc", "Foo"}}, 180 {row: []interface{}{2, "Tars", "Bar"}}, 181 {row: []interface{}{3, "Alpha", "Beta"}}, 182 {row: []interface{}{4, "Last", "End"}}, 183 } 184 // Try to write four rows through the Apply API. 185 for i, w := range writes { 186 var err error 187 m := InsertOrUpdate("Singers", 188 []string{"SingerId", "FirstName", "LastName"}, 189 w.row) 190 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 191 t.Fatal(err) 192 } 193 } 194 195 // For testing timestamp bound staleness. 196 <-time.After(time.Second) 197 198 // Test reading rows with different timestamp bounds. 199 for i, test := range []struct { 200 want [][]interface{} 201 tb TimestampBound 202 checkTs func(time.Time) error 203 }{ 204 { 205 // strong 206 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 207 StrongRead(), 208 func(ts time.Time) error { 209 // writes[3] is the last write, all subsequent strong read should have a timestamp larger than that. 210 if ts.Before(writes[3].ts) { 211 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 212 } 213 return nil 214 }, 215 }, 216 { 217 // min_read_timestamp 218 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 219 MinReadTimestamp(writes[3].ts), 220 func(ts time.Time) error { 221 if ts.Before(writes[3].ts) { 222 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 223 } 224 return nil 225 }, 226 }, 227 { 228 // max_staleness 229 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 230 MaxStaleness(time.Second), 231 func(ts time.Time) error { 232 if ts.Before(writes[3].ts) { 233 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 234 } 235 return nil 236 }, 237 }, 238 { 239 // read_timestamp 240 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 241 ReadTimestamp(writes[2].ts), 242 func(ts time.Time) error { 243 if ts != writes[2].ts { 244 return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts) 245 } 246 return nil 247 }, 248 }, 249 { 250 // exact_staleness 251 nil, 252 // Specify a staleness which should be already before this test because 253 // context timeout is set to be 10s. 254 ExactStaleness(11 * time.Second), 255 func(ts time.Time) error { 256 if ts.After(writes[0].ts) { 257 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts) 258 } 259 return nil 260 }, 261 }, 262 } { 263 // SingleUse.Query 264 su := client.Single().WithTimestampBound(test.tb) 265 got, err := readAll(su.Query( 266 ctx, 267 Statement{ 268 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)", 269 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 270 })) 271 if err != nil { 272 t.Errorf("%d: SingleUse.Query returns error %v, want nil", i, err) 273 } 274 if !testEqual(got, test.want) { 275 t.Errorf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want) 276 } 277 rts, err := su.Timestamp() 278 if err != nil { 279 t.Errorf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err) 280 } 281 if err := test.checkTs(rts); err != nil { 282 t.Errorf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err) 283 } 284 // SingleUse.Read 285 su = client.Single().WithTimestampBound(test.tb) 286 got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 287 if err != nil { 288 t.Errorf("%d: SingleUse.Read returns error %v, want nil", i, err) 289 } 290 if !testEqual(got, test.want) { 291 t.Errorf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want) 292 } 293 rts, err = su.Timestamp() 294 if err != nil { 295 t.Errorf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err) 296 } 297 if err := test.checkTs(rts); err != nil { 298 t.Errorf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err) 299 } 300 // SingleUse.ReadRow 301 got = nil 302 for _, k := range []Key{{1}, {3}, {4}} { 303 su = client.Single().WithTimestampBound(test.tb) 304 r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 305 if err != nil { 306 continue 307 } 308 v, err := rowToValues(r) 309 if err != nil { 310 continue 311 } 312 got = append(got, v) 313 rts, err = su.Timestamp() 314 if err != nil { 315 t.Errorf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 316 } 317 if err := test.checkTs(rts); err != nil { 318 t.Errorf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 319 } 320 } 321 if !testEqual(got, test.want) { 322 t.Errorf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want) 323 } 324 // SingleUse.ReadUsingIndex 325 su = client.Single().WithTimestampBound(test.tb) 326 got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 327 if err != nil { 328 t.Errorf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err) 329 } 330 // The results from ReadUsingIndex is sorted by the index rather than primary key. 331 if len(got) != len(test.want) { 332 t.Errorf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 333 } 334 for j, g := range got { 335 if j > 0 { 336 prev := got[j-1][1].(string) + got[j-1][2].(string) 337 curr := got[j][1].(string) + got[j][2].(string) 338 if strings.Compare(prev, curr) > 0 { 339 t.Errorf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 340 } 341 } 342 found := false 343 for _, w := range test.want { 344 if testEqual(g, w) { 345 found = true 346 } 347 } 348 if !found { 349 t.Errorf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) 350 break 351 } 352 } 353 rts, err = su.Timestamp() 354 if err != nil { 355 t.Errorf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 356 } 357 if err := test.checkTs(rts); err != nil { 358 t.Errorf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 359 } 360 } 361 362 // Reading with limit. 363 su := client.Single() 364 const limit = 1 365 gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), 366 []string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit})) 367 if err != nil { 368 t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err) 369 } 370 if got, want := len(gotRows), limit; got != want { 371 t.Errorf("got %d, want %d", got, want) 372 } 373 374} 375 376// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it 377// also tests for a single timestamp across multiple reads. 378func TestIntegration_ReadOnlyTransaction(t *testing.T) { 379 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 380 defer cancel() 381 // Set up testing environment. 382 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 383 defer cleanup() 384 385 writes := []struct { 386 row []interface{} 387 ts time.Time 388 }{ 389 {row: []interface{}{1, "Marc", "Foo"}}, 390 {row: []interface{}{2, "Tars", "Bar"}}, 391 {row: []interface{}{3, "Alpha", "Beta"}}, 392 {row: []interface{}{4, "Last", "End"}}, 393 } 394 // Try to write four rows through the Apply API. 395 for i, w := range writes { 396 var err error 397 m := InsertOrUpdate("Singers", 398 []string{"SingerId", "FirstName", "LastName"}, 399 w.row) 400 if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 401 t.Fatal(err) 402 } 403 } 404 405 // For testing timestamp bound staleness. 406 <-time.After(time.Second) 407 408 // Test reading rows with different timestamp bounds. 409 for i, test := range []struct { 410 want [][]interface{} 411 tb TimestampBound 412 checkTs func(time.Time) error 413 }{ 414 // Note: min_read_timestamp and max_staleness are not supported by ReadOnlyTransaction. See 415 // API document for more details. 416 { 417 // strong 418 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}, 419 StrongRead(), 420 func(ts time.Time) error { 421 if ts.Before(writes[3].ts) { 422 return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts) 423 } 424 return nil 425 }, 426 }, 427 { 428 // read_timestamp 429 [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}}, 430 ReadTimestamp(writes[2].ts), 431 func(ts time.Time) error { 432 if ts != writes[2].ts { 433 return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts) 434 } 435 return nil 436 }, 437 }, 438 { 439 // exact_staleness 440 nil, 441 // Specify a staleness which should be already before this test because 442 // context timeout is set to be 10s. 443 ExactStaleness(11 * time.Second), 444 func(ts time.Time) error { 445 if ts.After(writes[0].ts) { 446 return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts) 447 } 448 return nil 449 }, 450 }, 451 } { 452 // ReadOnlyTransaction.Query 453 ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb) 454 got, err := readAll(ro.Query( 455 ctx, 456 Statement{ 457 "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)", 458 map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)}, 459 })) 460 if err != nil { 461 t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err) 462 } 463 if !testEqual(got, test.want) { 464 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want) 465 } 466 rts, err := ro.Timestamp() 467 if err != nil { 468 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err) 469 } 470 if err := test.checkTs(rts); err != nil { 471 t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err) 472 } 473 roTs := rts 474 // ReadOnlyTransaction.Read 475 got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"})) 476 if err != nil { 477 t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err) 478 } 479 if !testEqual(got, test.want) { 480 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want) 481 } 482 rts, err = ro.Timestamp() 483 if err != nil { 484 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err) 485 } 486 if err := test.checkTs(rts); err != nil { 487 t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err) 488 } 489 if roTs != rts { 490 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 491 } 492 // ReadOnlyTransaction.ReadRow 493 got = nil 494 for _, k := range []Key{{1}, {3}, {4}} { 495 r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"}) 496 if err != nil { 497 continue 498 } 499 v, err := rowToValues(r) 500 if err != nil { 501 continue 502 } 503 got = append(got, v) 504 rts, err = ro.Timestamp() 505 if err != nil { 506 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err) 507 } 508 if err := test.checkTs(rts); err != nil { 509 t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err) 510 } 511 if roTs != rts { 512 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 513 } 514 } 515 if !testEqual(got, test.want) { 516 t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want) 517 } 518 // SingleUse.ReadUsingIndex 519 got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"})) 520 if err != nil { 521 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err) 522 } 523 // The results from ReadUsingIndex is sorted by the index rather than primary key. 524 if len(got) != len(test.want) { 525 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 526 } 527 for j, g := range got { 528 if j > 0 { 529 prev := got[j-1][1].(string) + got[j-1][2].(string) 530 curr := got[j][1].(string) + got[j][2].(string) 531 if strings.Compare(prev, curr) > 0 { 532 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j]) 533 } 534 } 535 found := false 536 for _, w := range test.want { 537 if testEqual(g, w) { 538 found = true 539 } 540 } 541 if !found { 542 t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want) 543 break 544 } 545 } 546 rts, err = ro.Timestamp() 547 if err != nil { 548 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err) 549 } 550 if err := test.checkTs(rts); err != nil { 551 t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err) 552 } 553 if roTs != rts { 554 t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts) 555 } 556 ro.Close() 557 } 558} 559 560// Test ReadOnlyTransaction with different timestamp bound when there's an update at the same time. 561func TestIntegration_UpdateDuringRead(t *testing.T) { 562 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 563 defer cancel() 564 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 565 defer cleanup() 566 567 for i, tb := range []TimestampBound{ 568 StrongRead(), 569 ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour 570 ExactStaleness(time.Minute * 30), 571 } { 572 ro := client.ReadOnlyTransaction().WithTimestampBound(tb) 573 _, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 574 if ErrCode(err) != codes.NotFound { 575 t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err) 576 } 577 578 m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i}) 579 if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { 580 t.Fatal(err) 581 } 582 583 _, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"}) 584 if ErrCode(err) != codes.NotFound { 585 t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err) 586 } 587 } 588} 589 590// Test ReadWriteTransaction. 591func TestIntegration_ReadWriteTransaction(t *testing.T) { 592 // Give a longer deadline because of transaction backoffs. 593 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) 594 defer cancel() 595 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 596 defer cleanup() 597 598 // Set up two accounts 599 accounts := []*Mutation{ 600 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 601 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 602 } 603 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 604 t.Fatal(err) 605 } 606 wg := sync.WaitGroup{} 607 608 readBalance := func(iter *RowIterator) (int64, error) { 609 defer iter.Stop() 610 var bal int64 611 for { 612 row, err := iter.Next() 613 if err == iterator.Done { 614 return bal, nil 615 } 616 if err != nil { 617 return 0, err 618 } 619 if err := row.Column(0, &bal); err != nil { 620 return 0, err 621 } 622 } 623 } 624 625 for i := 0; i < 20; i++ { 626 wg.Add(1) 627 go func(iter int) { 628 defer wg.Done() 629 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 630 // Query Foo's balance and Bar's balance. 631 bf, e := readBalance(tx.Query(ctx, 632 Statement{"SELECT Balance FROM Accounts WHERE AccountId = @id", map[string]interface{}{"id": int64(1)}})) 633 if e != nil { 634 return e 635 } 636 bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"})) 637 if e != nil { 638 return e 639 } 640 if bf <= 0 { 641 return nil 642 } 643 bf-- 644 bb++ 645 return tx.BufferWrite([]*Mutation{ 646 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}), 647 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}), 648 }) 649 }) 650 if err != nil { 651 t.Errorf("%d: failed to execute transaction: %v", iter, err) 652 } 653 }(i) 654 } 655 // Because of context timeout, all goroutines will eventually return. 656 wg.Wait() 657 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 658 var bf, bb int64 659 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"}) 660 if e != nil { 661 return e 662 } 663 if ce := r.Column(0, &bf); ce != nil { 664 return ce 665 } 666 bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"})) 667 if e != nil { 668 return e 669 } 670 if bf != 30 || bb != 21 { 671 t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21) 672 } 673 return nil 674 }) 675 if err != nil { 676 t.Errorf("failed to check balances: %v", err) 677 } 678} 679 680func TestIntegration_Reads(t *testing.T) { 681 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 682 defer cancel() 683 // Set up testing environment. 684 client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements) 685 defer cleanup() 686 687 // Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2". 688 var ms []*Mutation 689 for i := 0; i < 15; i++ { 690 ms = append(ms, InsertOrUpdate(testTable, 691 testTableColumns, 692 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 693 } 694 // Don't use ApplyAtLeastOnce, so we can test the other code path. 695 if _, err := client.Apply(ctx, ms); err != nil { 696 t.Fatal(err) 697 } 698 699 // Empty read. 700 rows, err := readAllTestTable(client.Single().Read(ctx, testTable, 701 KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns)) 702 if err != nil { 703 t.Fatal(err) 704 } 705 if got, want := len(rows), 0; got != want { 706 t.Errorf("got %d, want %d", got, want) 707 } 708 709 // Index empty read. 710 rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, 711 KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns)) 712 if err != nil { 713 t.Fatal(err) 714 } 715 if got, want := len(rows), 0; got != want { 716 t.Errorf("got %d, want %d", got, want) 717 } 718 719 // Point read. 720 row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns) 721 if err != nil { 722 t.Fatal(err) 723 } 724 var got testTableRow 725 if err := row.ToStruct(&got); err != nil { 726 t.Fatal(err) 727 } 728 if want := (testTableRow{"k1", "v1"}); got != want { 729 t.Errorf("got %v, want %v", got, want) 730 } 731 732 // Point read not found. 733 _, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns) 734 if ErrCode(err) != codes.NotFound { 735 t.Fatalf("got %v, want NotFound", err) 736 } 737 738 // No index point read not found, because Go does not have ReadRowUsingIndex. 739 740 rangeReads(ctx, t, client) 741 indexRangeReads(ctx, t, client) 742} 743 744func TestIntegration_EarlyTimestamp(t *testing.T) { 745 // Test that we can get the timestamp from a read-only transaction as 746 // soon as we have read at least one row. 747 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) 748 defer cancel() 749 // Set up testing environment. 750 client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements) 751 defer cleanup() 752 753 var ms []*Mutation 754 for i := 0; i < 3; i++ { 755 ms = append(ms, InsertOrUpdate(testTable, 756 testTableColumns, 757 []interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)})) 758 } 759 if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil { 760 t.Fatal(err) 761 } 762 763 txn := client.Single() 764 iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns) 765 defer iter.Stop() 766 // In single-use transaction, we should get an error before reading anything. 767 if _, err := txn.Timestamp(); err == nil { 768 t.Error("wanted error, got nil") 769 } 770 // After reading one row, the timestamp should be available. 771 _, err := iter.Next() 772 if err != nil { 773 t.Fatal(err) 774 } 775 if _, err := txn.Timestamp(); err != nil { 776 t.Errorf("got %v, want nil", err) 777 } 778 779 txn = client.ReadOnlyTransaction() 780 defer txn.Close() 781 iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns) 782 defer iter.Stop() 783 // In an ordinary read-only transaction, the timestamp should be 784 // available immediately. 785 if _, err := txn.Timestamp(); err != nil { 786 t.Errorf("got %v, want nil", err) 787 } 788} 789 790func TestIntegration_NestedTransaction(t *testing.T) { 791 // You cannot use a transaction from inside a read-write transaction. 792 ctx := context.Background() 793 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 794 defer cleanup() 795 796 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 797 _, err := client.ReadWriteTransaction(ctx, 798 func(context.Context, *ReadWriteTransaction) error { return nil }) 799 if ErrCode(err) != codes.FailedPrecondition { 800 t.Fatalf("got %v, want FailedPrecondition", err) 801 } 802 _, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 803 if ErrCode(err) != codes.FailedPrecondition { 804 t.Fatalf("got %v, want FailedPrecondition", err) 805 } 806 rot := client.ReadOnlyTransaction() 807 defer rot.Close() 808 _, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"}) 809 if ErrCode(err) != codes.FailedPrecondition { 810 t.Fatalf("got %v, want FailedPrecondition", err) 811 } 812 return nil 813 }) 814 if err != nil { 815 t.Fatal(err) 816 } 817} 818 819// Test client recovery on database recreation. 820func TestIntegration_DbRemovalRecovery(t *testing.T) { 821 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) 822 defer cancel() 823 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 824 defer cleanup() 825 826 // Drop the testing database. 827 if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil { 828 t.Fatalf("failed to drop testing database %v: %v", dbPath, err) 829 } 830 831 // Now, send the query. 832 iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 833 defer iter.Stop() 834 if _, err := iter.Next(); err == nil { 835 t.Errorf("client sends query to removed database successfully, want it to fail") 836 } 837 838 // Recreate database and table. 839 dbName := dbPath[strings.LastIndex(dbPath, "/")+1:] 840 op, err := admin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{ 841 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 842 CreateStatement: "CREATE DATABASE " + dbName, 843 ExtraStatements: []string{ 844 `CREATE TABLE Singers ( 845 SingerId INT64 NOT NULL, 846 FirstName STRING(1024), 847 LastName STRING(1024), 848 SingerInfo BYTES(MAX) 849 ) PRIMARY KEY (SingerId)`, 850 }, 851 }) 852 if err != nil { 853 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 854 } 855 if _, err := op.Wait(ctx); err != nil { 856 t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err) 857 } 858 859 // Now, send the query again. 860 iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"}) 861 defer iter.Stop() 862 _, err = iter.Next() 863 if err != nil && err != iterator.Done { 864 t.Errorf("failed to send query to database %v: %v", dbPath, err) 865 } 866} 867 868// Test encoding/decoding non-struct Cloud Spanner types. 869func TestIntegration_BasicTypes(t *testing.T) { 870 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 871 defer cancel() 872 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 873 defer cleanup() 874 875 t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z") 876 // Boundaries 877 t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z") 878 t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z") 879 d1, _ := civil.ParseDate("2016-11-15") 880 // Boundaries 881 d2, _ := civil.ParseDate("0001-01-01") 882 d3, _ := civil.ParseDate("9999-12-31") 883 884 tests := []struct { 885 col string 886 val interface{} 887 want interface{} 888 }{ 889 {col: "String", val: ""}, 890 {col: "String", val: "", want: NullString{"", true}}, 891 {col: "String", val: "foo"}, 892 {col: "String", val: "foo", want: NullString{"foo", true}}, 893 {col: "String", val: NullString{"bar", true}, want: "bar"}, 894 {col: "String", val: NullString{"bar", false}, want: NullString{"", false}}, 895 {col: "String", val: nil, want: NullString{}}, 896 {col: "StringArray", val: []string(nil), want: []NullString(nil)}, 897 {col: "StringArray", val: []string{}, want: []NullString{}}, 898 {col: "StringArray", val: []string{"foo", "bar"}, want: []NullString{{"foo", true}, {"bar", true}}}, 899 {col: "StringArray", val: []NullString(nil)}, 900 {col: "StringArray", val: []NullString{}}, 901 {col: "StringArray", val: []NullString{{"foo", true}, {}}}, 902 {col: "Bytes", val: []byte{}}, 903 {col: "Bytes", val: []byte{1, 2, 3}}, 904 {col: "Bytes", val: []byte(nil)}, 905 {col: "BytesArray", val: [][]byte(nil)}, 906 {col: "BytesArray", val: [][]byte{}}, 907 {col: "BytesArray", val: [][]byte{{1}, {2, 3}}}, 908 {col: "Int64a", val: 0, want: int64(0)}, 909 {col: "Int64a", val: -1, want: int64(-1)}, 910 {col: "Int64a", val: 2, want: int64(2)}, 911 {col: "Int64a", val: int64(3)}, 912 {col: "Int64a", val: 4, want: NullInt64{4, true}}, 913 {col: "Int64a", val: NullInt64{5, true}, want: int64(5)}, 914 {col: "Int64a", val: NullInt64{6, true}, want: int64(6)}, 915 {col: "Int64a", val: NullInt64{7, false}, want: NullInt64{0, false}}, 916 {col: "Int64a", val: nil, want: NullInt64{}}, 917 {col: "Int64Array", val: []int(nil), want: []NullInt64(nil)}, 918 {col: "Int64Array", val: []int{}, want: []NullInt64{}}, 919 {col: "Int64Array", val: []int{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 920 {col: "Int64Array", val: []int64(nil), want: []NullInt64(nil)}, 921 {col: "Int64Array", val: []int64{}, want: []NullInt64{}}, 922 {col: "Int64Array", val: []int64{1, 2}, want: []NullInt64{{1, true}, {2, true}}}, 923 {col: "Int64Array", val: []NullInt64(nil)}, 924 {col: "Int64Array", val: []NullInt64{}}, 925 {col: "Int64Array", val: []NullInt64{{1, true}, {}}}, 926 {col: "Bool", val: false}, 927 {col: "Bool", val: true}, 928 {col: "Bool", val: false, want: NullBool{false, true}}, 929 {col: "Bool", val: true, want: NullBool{true, true}}, 930 {col: "Bool", val: NullBool{true, true}}, 931 {col: "Bool", val: NullBool{false, false}}, 932 {col: "Bool", val: nil, want: NullBool{}}, 933 {col: "BoolArray", val: []bool(nil), want: []NullBool(nil)}, 934 {col: "BoolArray", val: []bool{}, want: []NullBool{}}, 935 {col: "BoolArray", val: []bool{true, false}, want: []NullBool{{true, true}, {false, true}}}, 936 {col: "BoolArray", val: []NullBool(nil)}, 937 {col: "BoolArray", val: []NullBool{}}, 938 {col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}}, 939 {col: "Float64", val: 0.0}, 940 {col: "Float64", val: 3.14}, 941 {col: "Float64", val: math.NaN()}, 942 {col: "Float64", val: math.Inf(1)}, 943 {col: "Float64", val: math.Inf(-1)}, 944 {col: "Float64", val: 2.78, want: NullFloat64{2.78, true}}, 945 {col: "Float64", val: NullFloat64{2.71, true}, want: 2.71}, 946 {col: "Float64", val: NullFloat64{1.41, true}, want: NullFloat64{1.41, true}}, 947 {col: "Float64", val: NullFloat64{0, false}}, 948 {col: "Float64", val: nil, want: NullFloat64{}}, 949 {col: "Float64Array", val: []float64(nil), want: []NullFloat64(nil)}, 950 {col: "Float64Array", val: []float64{}, want: []NullFloat64{}}, 951 {col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, want: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}}, 952 {col: "Float64Array", val: []NullFloat64(nil)}, 953 {col: "Float64Array", val: []NullFloat64{}}, 954 {col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}}, 955 {col: "Date", val: d1}, 956 {col: "Date", val: d1, want: NullDate{d1, true}}, 957 {col: "Date", val: NullDate{d1, true}}, 958 {col: "Date", val: NullDate{d1, true}, want: d1}, 959 {col: "Date", val: NullDate{civil.Date{}, false}}, 960 {col: "DateArray", val: []civil.Date(nil), want: []NullDate(nil)}, 961 {col: "DateArray", val: []civil.Date{}, want: []NullDate{}}, 962 {col: "DateArray", val: []civil.Date{d1, d2, d3}, want: []NullDate{{d1, true}, {d2, true}, {d3, true}}}, 963 {col: "Timestamp", val: t1}, 964 {col: "Timestamp", val: t1, want: NullTime{t1, true}}, 965 {col: "Timestamp", val: NullTime{t1, true}}, 966 {col: "Timestamp", val: NullTime{t1, true}, want: t1}, 967 {col: "Timestamp", val: NullTime{}}, 968 {col: "Timestamp", val: nil, want: NullTime{}}, 969 {col: "TimestampArray", val: []time.Time(nil), want: []NullTime(nil)}, 970 {col: "TimestampArray", val: []time.Time{}, want: []NullTime{}}, 971 {col: "TimestampArray", val: []time.Time{t1, t2, t3}, want: []NullTime{{t1, true}, {t2, true}, {t3, true}}}, 972 } 973 974 // Write rows into table first. 975 var muts []*Mutation 976 for i, test := range tests { 977 muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val})) 978 } 979 if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil { 980 t.Fatal(err) 981 } 982 983 for i, test := range tests { 984 row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col}) 985 if err != nil { 986 t.Fatalf("Unable to fetch row %v: %v", i, err) 987 } 988 // Create new instance of type of test.want. 989 want := test.want 990 if want == nil { 991 want = test.val 992 } 993 gotp := reflect.New(reflect.TypeOf(want)) 994 if err := row.Column(0, gotp.Interface()); err != nil { 995 t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err) 996 continue 997 } 998 got := reflect.Indirect(gotp).Interface() 999 1000 // One of the test cases is checking NaN handling. Given 1001 // NaN!=NaN, we can't use reflect to test for it. 1002 if isNaN(got) && isNaN(want) { 1003 continue 1004 } 1005 1006 // Check non-NaN cases. 1007 if !testEqual(got, want) { 1008 t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want) 1009 continue 1010 } 1011 } 1012} 1013 1014// Test decoding Cloud Spanner STRUCT type. 1015func TestIntegration_StructTypes(t *testing.T) { 1016 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1017 defer cancel() 1018 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 1019 defer cleanup() 1020 1021 tests := []struct { 1022 q Statement 1023 want func(r *Row) error 1024 }{ 1025 { 1026 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`}, 1027 want: func(r *Row) error { 1028 // Test STRUCT ARRAY decoding to []NullRow. 1029 var rows []NullRow 1030 if err := r.Column(0, &rows); err != nil { 1031 return err 1032 } 1033 if len(rows) != 1 { 1034 return fmt.Errorf("len(rows) = %d; want 1", len(rows)) 1035 } 1036 if !rows[0].Valid { 1037 return fmt.Errorf("rows[0] is NULL") 1038 } 1039 var i, j int64 1040 if err := rows[0].Row.Columns(&i, &j); err != nil { 1041 return err 1042 } 1043 if i != 1 || j != 2 { 1044 return fmt.Errorf("got (%d,%d), want (1,2)", i, j) 1045 } 1046 return nil 1047 }, 1048 }, 1049 { 1050 q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`}, 1051 want: func(r *Row) error { 1052 // Test Row.ToStruct. 1053 s := struct { 1054 Col1 []*struct { 1055 Foo int64 `spanner:"foo"` 1056 Bar int64 `spanner:"bar"` 1057 } `spanner:"col1"` 1058 }{} 1059 if err := r.ToStruct(&s); err != nil { 1060 return err 1061 } 1062 want := struct { 1063 Col1 []*struct { 1064 Foo int64 `spanner:"foo"` 1065 Bar int64 `spanner:"bar"` 1066 } `spanner:"col1"` 1067 }{ 1068 Col1: []*struct { 1069 Foo int64 `spanner:"foo"` 1070 Bar int64 `spanner:"bar"` 1071 }{ 1072 { 1073 Foo: 1, 1074 Bar: 2, 1075 }, 1076 }, 1077 } 1078 if !testEqual(want, s) { 1079 return fmt.Errorf("unexpected decoding result: %v, want %v", s, want) 1080 } 1081 return nil 1082 }, 1083 }, 1084 } 1085 for i, test := range tests { 1086 iter := client.Single().Query(ctx, test.q) 1087 defer iter.Stop() 1088 row, err := iter.Next() 1089 if err != nil { 1090 t.Errorf("%d: %v", i, err) 1091 continue 1092 } 1093 if err := test.want(row); err != nil { 1094 t.Errorf("%d: %v", i, err) 1095 continue 1096 } 1097 } 1098} 1099 1100func TestIntegration_StructParametersUnsupported(t *testing.T) { 1101 ctx := context.Background() 1102 client, _, cleanup := prepareIntegrationTest(ctx, t, nil) 1103 defer cleanup() 1104 1105 for _, test := range []struct { 1106 param interface{} 1107 wantCode codes.Code 1108 wantMsgPart string 1109 }{ 1110 { 1111 struct { 1112 Field int 1113 }{10}, 1114 codes.Unimplemented, 1115 "Unsupported query shape: " + 1116 "A struct value cannot be returned as a column value. " + 1117 "Rewrite the query to flatten the struct fields in the result.", 1118 }, 1119 { 1120 []struct { 1121 Field int 1122 }{{10}, {20}}, 1123 codes.Unimplemented, 1124 "Unsupported query shape: " + 1125 "This query can return a null-valued array of struct, " + 1126 "which is not supported by Spanner.", 1127 }, 1128 } { 1129 iter := client.Single().Query(ctx, Statement{ 1130 SQL: "SELECT @p", 1131 Params: map[string]interface{}{"p": test.param}, 1132 }) 1133 _, err := iter.Next() 1134 iter.Stop() 1135 if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok { 1136 t.Fatal(msg) 1137 } 1138 } 1139} 1140 1141// Test queries of the form "SELECT expr". 1142func TestIntegration_QueryExpressions(t *testing.T) { 1143 ctx := context.Background() 1144 client, _, cleanup := prepareIntegrationTest(ctx, t, nil) 1145 defer cleanup() 1146 1147 newRow := func(vals []interface{}) *Row { 1148 row, err := NewRow(make([]string, len(vals)), vals) 1149 if err != nil { 1150 t.Fatal(err) 1151 } 1152 return row 1153 } 1154 1155 tests := []struct { 1156 expr string 1157 want interface{} 1158 }{ 1159 {"1", int64(1)}, 1160 {"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}}, 1161 {"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}}, 1162 {"IEEE_DIVIDE(1, 0)", math.Inf(1)}, 1163 {"IEEE_DIVIDE(-1, 0)", math.Inf(-1)}, 1164 {"IEEE_DIVIDE(0, 0)", math.NaN()}, 1165 // TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate. 1166 {"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}}, 1167 {"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}}, 1168 {"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}}, 1169 } 1170 for _, test := range tests { 1171 iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr}) 1172 defer iter.Stop() 1173 row, err := iter.Next() 1174 if err != nil { 1175 t.Errorf("%q: %v", test.expr, err) 1176 continue 1177 } 1178 // Create new instance of type of test.want. 1179 gotp := reflect.New(reflect.TypeOf(test.want)) 1180 if err := row.Column(0, gotp.Interface()); err != nil { 1181 t.Errorf("%q: Column returned error %v", test.expr, err) 1182 continue 1183 } 1184 got := reflect.Indirect(gotp).Interface() 1185 // TODO(jba): remove isNaN special case when we have a better equality predicate. 1186 if isNaN(got) && isNaN(test.want) { 1187 continue 1188 } 1189 if !testEqual(got, test.want) { 1190 t.Errorf("%q\n got %#v\nwant %#v", test.expr, got, test.want) 1191 } 1192 } 1193} 1194 1195func TestIntegration_QueryStats(t *testing.T) { 1196 ctx := context.Background() 1197 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 1198 defer cleanup() 1199 1200 accounts := []*Mutation{ 1201 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), 1202 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), 1203 } 1204 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1205 t.Fatal(err) 1206 } 1207 const sql = "SELECT Balance FROM Accounts" 1208 1209 qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil}) 1210 if err != nil { 1211 t.Fatal(err) 1212 } 1213 if len(qp.PlanNodes) == 0 { 1214 t.Error("got zero plan nodes, expected at least one") 1215 } 1216 1217 iter := client.Single().QueryWithStats(ctx, Statement{sql, nil}) 1218 defer iter.Stop() 1219 for { 1220 _, err := iter.Next() 1221 if err == iterator.Done { 1222 break 1223 } 1224 if err != nil { 1225 t.Fatal(err) 1226 } 1227 } 1228 if iter.QueryPlan == nil { 1229 t.Error("got nil QueryPlan, expected one") 1230 } 1231 if iter.QueryStats == nil { 1232 t.Error("got nil QueryStats, expected some") 1233 } 1234} 1235 1236func TestIntegration_InvalidDatabase(t *testing.T) { 1237 if testProjectID == "" { 1238 t.Skip("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing") 1239 } 1240 ctx := context.Background() 1241 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID) 1242 c, err := createClient(ctx, dbPath) 1243 // Client creation should succeed even if the database is invalid. 1244 if err != nil { 1245 t.Fatal(err) 1246 } 1247 _, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"}) 1248 if msg, ok := matchError(err, codes.NotFound, ""); !ok { 1249 t.Fatal(msg) 1250 } 1251} 1252 1253func TestIntegration_ReadErrors(t *testing.T) { 1254 ctx := context.Background() 1255 client, _, cleanup := prepareIntegrationTest(ctx, t, readDBStatements) 1256 defer cleanup() 1257 1258 // Read over invalid table fails 1259 _, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"}) 1260 if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok { 1261 t.Error(msg) 1262 } 1263 // Read over invalid column fails 1264 _, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"}) 1265 if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok { 1266 t.Error(msg) 1267 } 1268 1269 // Invalid query fails 1270 iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"}) 1271 defer iter.Stop() 1272 _, err = iter.Next() 1273 if msg, ok := matchError(err, codes.InvalidArgument, "unrecognized name"); !ok { 1274 t.Error(msg) 1275 } 1276 1277 // Read should fail on cancellation. 1278 cctx, cancel := context.WithCancel(ctx) 1279 cancel() 1280 _, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"}) 1281 if msg, ok := matchError(err, codes.Canceled, ""); !ok { 1282 t.Error(msg) 1283 } 1284 // Read should fail if deadline exceeded. 1285 dctx, cancel := context.WithTimeout(ctx, time.Nanosecond) 1286 defer cancel() 1287 <-dctx.Done() 1288 _, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"}) 1289 if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok { 1290 t.Error(msg) 1291 } 1292} 1293 1294// Test TransactionRunner. Test that transactions are aborted and retried as expected. 1295func TestIntegration_TransactionRunner(t *testing.T) { 1296 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1297 defer cancel() 1298 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 1299 defer cleanup() 1300 1301 // Test 1: User error should abort the transaction. 1302 _, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1303 tx.BufferWrite([]*Mutation{ 1304 Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})}) 1305 return errors.New("user error") 1306 }) 1307 // Empty read. 1308 rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"})) 1309 if err != nil { 1310 t.Fatal(err) 1311 } 1312 if got, want := len(rows), 0; got != want { 1313 t.Errorf("Empty read, got %d, want %d.", got, want) 1314 } 1315 1316 // Test 2: Expect abort and retry. 1317 // We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by committing writes to the column txn2 have read, 1318 // and expect the following read to abort and txn2 retries. 1319 1320 // Set up two accounts 1321 accounts := []*Mutation{ 1322 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}), 1323 Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}), 1324 } 1325 if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil { 1326 t.Fatal(err) 1327 } 1328 1329 var ( 1330 cTxn1Start = make(chan struct{}) 1331 cTxn1Commit = make(chan struct{}) 1332 cTxn2Start = make(chan struct{}) 1333 wg sync.WaitGroup 1334 ) 1335 1336 // read balance, check error if we don't expect abort. 1337 readBalance := func(tx interface { 1338 ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) 1339 }, key int64, expectAbort bool) (int64, error) { 1340 var b int64 1341 r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"}) 1342 if e != nil { 1343 if expectAbort && !isAbortErr(e) { 1344 t.Errorf("ReadRow got %v, want Abort error.", e) 1345 } 1346 return b, e 1347 } 1348 if ce := r.Column(0, &b); ce != nil { 1349 return b, ce 1350 } 1351 return b, nil 1352 } 1353 1354 wg.Add(2) 1355 // Txn 1 1356 go func() { 1357 defer wg.Done() 1358 var once sync.Once 1359 _, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1360 b, e := readBalance(tx, 1, false) 1361 if e != nil { 1362 return e 1363 } 1364 // txn 1 can abort, in that case we skip closing the channel on retry. 1365 once.Do(func() { close(cTxn1Start) }) 1366 e = tx.BufferWrite([]*Mutation{ 1367 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})}) 1368 if e != nil { 1369 return e 1370 } 1371 // Wait for second transaction. 1372 <-cTxn2Start 1373 return nil 1374 }) 1375 close(cTxn1Commit) 1376 if e != nil { 1377 t.Errorf("Transaction 1 commit, got %v, want nil.", e) 1378 } 1379 }() 1380 // Txn 2 1381 go func() { 1382 // Wait until txn 1 starts. 1383 <-cTxn1Start 1384 defer wg.Done() 1385 var ( 1386 once sync.Once 1387 b1 int64 1388 b2 int64 1389 e error 1390 ) 1391 _, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1392 if b1, e = readBalance(tx, 1, false); e != nil { 1393 return e 1394 } 1395 // Skip closing channel on retry. 1396 once.Do(func() { close(cTxn2Start) }) 1397 // Wait until txn 1 successfully commits. 1398 <-cTxn1Commit 1399 // Txn1 has committed and written a balance to the account. 1400 // Now this transaction (txn2) reads and re-writes the balance. 1401 // The first time through, it will abort because it overlaps with txn1. 1402 // Then it will retry after txn1 commits, and succeed. 1403 if b2, e = readBalance(tx, 2, true); e != nil { 1404 return e 1405 } 1406 return tx.BufferWrite([]*Mutation{ 1407 Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})}) 1408 }) 1409 if e != nil { 1410 t.Errorf("Transaction 2 commit, got %v, want nil.", e) 1411 } 1412 }() 1413 wg.Wait() 1414 // Check that both transactions' effects are visible. 1415 for i := int64(1); i <= int64(2); i++ { 1416 if b, e := readBalance(client.Single(), i, false); e != nil { 1417 t.Fatalf("ReadBalance for key %d error %v.", i, e) 1418 } else if b != i { 1419 t.Errorf("Balance for key %d, got %d, want %d.", i, b, i) 1420 } 1421 } 1422} 1423 1424// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then 1425// serialize and deserialize both transaction and partition to be used in 1426// execution on another client, and compare results. 1427func TestIntegration_BatchQuery(t *testing.T) { 1428 // Set up testing environment. 1429 var ( 1430 client2 *Client 1431 err error 1432 ) 1433 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1434 defer cancel() 1435 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements) 1436 defer cleanup() 1437 1438 if err = populate(ctx, client); err != nil { 1439 t.Fatal(err) 1440 } 1441 if client2, err = createClient(ctx, dbPath); err != nil { 1442 t.Fatal(err) 1443 } 1444 defer client2.Close() 1445 1446 // PartitionQuery 1447 var ( 1448 txn *BatchReadOnlyTransaction 1449 partitions []*Partition 1450 stmt = Statement{SQL: "SELECT * FROM test;"} 1451 ) 1452 1453 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1454 t.Fatal(err) 1455 } 1456 defer txn.Cleanup(ctx) 1457 if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil { 1458 t.Fatal(err) 1459 } 1460 1461 // Reconstruct BatchReadOnlyTransactionID and execute partitions 1462 var ( 1463 tid2 BatchReadOnlyTransactionID 1464 data []byte 1465 gotResult bool // if we get matching result from two separate txns 1466 ) 1467 if data, err = txn.ID.MarshalBinary(); err != nil { 1468 t.Fatalf("encoding failed %v", err) 1469 } 1470 if err = tid2.UnmarshalBinary(data); err != nil { 1471 t.Fatalf("decoding failed %v", err) 1472 } 1473 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 1474 1475 // Execute Partitions and compare results 1476 for i, p := range partitions { 1477 iter := txn.Execute(ctx, p) 1478 defer iter.Stop() 1479 p2 := serdesPartition(t, i, p) 1480 iter2 := txn2.Execute(ctx, &p2) 1481 defer iter2.Stop() 1482 1483 row1, err1 := iter.Next() 1484 row2, err2 := iter2.Next() 1485 if err1 != err2 { 1486 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 1487 continue 1488 } 1489 if !testEqual(row1, row2) { 1490 t.Fatalf("execution returned different values: %v, %v", row1, row2) 1491 continue 1492 } 1493 if row1 == nil { 1494 continue 1495 } 1496 var a, b string 1497 if err = row1.Columns(&a, &b); err != nil { 1498 t.Fatalf("failed to parse row %v", err) 1499 continue 1500 } 1501 if a == str1 && b == str2 { 1502 gotResult = true 1503 } 1504 } 1505 if !gotResult { 1506 t.Fatalf("execution didn't return expected values") 1507 } 1508} 1509 1510// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery 1511func TestIntegration_BatchRead(t *testing.T) { 1512 // Set up testing environment. 1513 var ( 1514 client2 *Client 1515 err error 1516 ) 1517 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1518 defer cancel() 1519 client, dbPath, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements) 1520 defer cleanup() 1521 1522 if err = populate(ctx, client); err != nil { 1523 t.Fatal(err) 1524 } 1525 if client2, err = createClient(ctx, dbPath); err != nil { 1526 t.Fatal(err) 1527 } 1528 defer client2.Close() 1529 1530 // PartitionRead 1531 var ( 1532 txn *BatchReadOnlyTransaction 1533 partitions []*Partition 1534 ) 1535 1536 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1537 t.Fatal(err) 1538 } 1539 defer txn.Cleanup(ctx) 1540 if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 1541 t.Fatal(err) 1542 } 1543 1544 // Reconstruct BatchReadOnlyTransactionID and execute partitions 1545 var ( 1546 tid2 BatchReadOnlyTransactionID 1547 data []byte 1548 gotResult bool // if we get matching result from two separate txns 1549 ) 1550 if data, err = txn.ID.MarshalBinary(); err != nil { 1551 t.Fatalf("encoding failed %v", err) 1552 } 1553 if err = tid2.UnmarshalBinary(data); err != nil { 1554 t.Fatalf("decoding failed %v", err) 1555 } 1556 txn2 := client2.BatchReadOnlyTransactionFromID(tid2) 1557 1558 // Execute Partitions and compare results 1559 for i, p := range partitions { 1560 iter := txn.Execute(ctx, p) 1561 defer iter.Stop() 1562 p2 := serdesPartition(t, i, p) 1563 iter2 := txn2.Execute(ctx, &p2) 1564 defer iter2.Stop() 1565 1566 row1, err1 := iter.Next() 1567 row2, err2 := iter2.Next() 1568 if err1 != err2 { 1569 t.Fatalf("execution failed for different reasons: %v, %v", err1, err2) 1570 continue 1571 } 1572 if !testEqual(row1, row2) { 1573 t.Fatalf("execution returned different values: %v, %v", row1, row2) 1574 continue 1575 } 1576 if row1 == nil { 1577 continue 1578 } 1579 var a, b string 1580 if err = row1.Columns(&a, &b); err != nil { 1581 t.Fatalf("failed to parse row %v", err) 1582 continue 1583 } 1584 if a == str1 && b == str2 { 1585 gotResult = true 1586 } 1587 } 1588 if !gotResult { 1589 t.Fatalf("execution didn't return expected values") 1590 } 1591} 1592 1593// Test normal txReadEnv method on BatchReadOnlyTransaction. 1594func TestIntegration_BROTNormal(t *testing.T) { 1595 // Set up testing environment and create txn. 1596 var ( 1597 txn *BatchReadOnlyTransaction 1598 err error 1599 row *Row 1600 i int64 1601 ) 1602 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1603 defer cancel() 1604 client, _, cleanup := prepareIntegrationTest(ctx, t, simpleDBStatements) 1605 defer cleanup() 1606 1607 if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil { 1608 t.Fatal(err) 1609 } 1610 defer txn.Cleanup(ctx) 1611 if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { 1612 t.Fatal(err) 1613 } 1614 // Normal query should work with BatchReadOnlyTransaction 1615 stmt2 := Statement{SQL: "SELECT 1"} 1616 iter := txn.Query(ctx, stmt2) 1617 defer iter.Stop() 1618 1619 row, err = iter.Next() 1620 if err != nil { 1621 t.Errorf("query failed with %v", err) 1622 } 1623 if err = row.Columns(&i); err != nil { 1624 t.Errorf("failed to parse row %v", err) 1625 } 1626} 1627 1628func TestIntegration_CommitTimestamp(t *testing.T) { 1629 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1630 defer cancel() 1631 client, _, cleanup := prepareIntegrationTest(ctx, t, ctsDBStatements) 1632 defer cleanup() 1633 1634 type testTableRow struct { 1635 Key string 1636 Ts NullTime 1637 } 1638 1639 var ( 1640 cts1, cts2, ts1, ts2 time.Time 1641 err error 1642 ) 1643 1644 // Apply mutation in sequence, expect to see commit timestamp in good order, check also the commit timestamp returned 1645 for _, it := range []struct { 1646 k string 1647 t *time.Time 1648 }{ 1649 {"a", &cts1}, 1650 {"b", &cts2}, 1651 } { 1652 tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}} 1653 m, err := InsertStruct("TestTable", tt) 1654 if err != nil { 1655 t.Fatal(err) 1656 } 1657 *it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()) 1658 if err != nil { 1659 t.Fatal(err) 1660 } 1661 } 1662 1663 txn := client.ReadOnlyTransaction() 1664 for _, it := range []struct { 1665 k string 1666 t *time.Time 1667 }{ 1668 {"a", &ts1}, 1669 {"b", &ts2}, 1670 } { 1671 if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil { 1672 t.Fatal(err) 1673 } else { 1674 var got testTableRow 1675 if err := r.ToStruct(&got); err != nil { 1676 t.Fatal(err) 1677 } 1678 *it.t = got.Ts.Time 1679 } 1680 } 1681 if !cts1.Equal(ts1) { 1682 t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1) 1683 } 1684 if !cts2.Equal(ts2) { 1685 t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2) 1686 } 1687 1688 // Try writing a timestamp in the future to commit timestamp, expect error 1689 _, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce()) 1690 if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok { 1691 t.Error(msg) 1692 } 1693} 1694 1695func TestIntegration_DML(t *testing.T) { 1696 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 1697 defer cancel() 1698 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 1699 defer cleanup() 1700 1701 // Function that reads a single row's first name from within a transaction. 1702 readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) { 1703 row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"}) 1704 if err != nil { 1705 return "", err 1706 } 1707 var fn string 1708 if err := row.Column(0, &fn); err != nil { 1709 return "", err 1710 } 1711 return fn, nil 1712 } 1713 1714 // Function that reads multiple rows' first names from outside a read/write transaction. 1715 readFirstNames := func(keys ...int) []string { 1716 var ks []KeySet 1717 for _, k := range keys { 1718 ks = append(ks, Key{k}) 1719 } 1720 iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"}) 1721 var got []string 1722 var fn string 1723 err := iter.Do(func(row *Row) error { 1724 if err := row.Column(0, &fn); err != nil { 1725 return err 1726 } 1727 got = append(got, fn) 1728 return nil 1729 }) 1730 if err != nil { 1731 t.Fatalf("readFirstNames(%v): %v", keys, err) 1732 } 1733 return got 1734 } 1735 1736 // Use ReadWriteTransaction.Query to execute a DML statement. 1737 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1738 iter := tx.Query(ctx, Statement{ 1739 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`, 1740 }) 1741 defer iter.Stop() 1742 if row, err := iter.Next(); err != iterator.Done { 1743 t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err) 1744 } 1745 if iter.RowCount != 1 { 1746 t.Errorf("row count: got %d, want 1", iter.RowCount) 1747 } 1748 // The results of the DML statement should be visible to the transaction. 1749 got, err := readFirstName(tx, 1) 1750 if err != nil { 1751 return err 1752 } 1753 if want := "Umm"; got != want { 1754 t.Errorf("got %q, want %q", got, want) 1755 } 1756 return nil 1757 }) 1758 if err != nil { 1759 t.Fatal(err) 1760 } 1761 1762 // Use ReadWriteTransaction.Update to execute a DML statement. 1763 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1764 count, err := tx.Update(ctx, Statement{ 1765 SQL: `Insert INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`, 1766 }) 1767 if err != nil { 1768 t.Fatal(err) 1769 } 1770 if count != 1 { 1771 t.Errorf("row count: got %d, want 1", count) 1772 } 1773 got, err := readFirstName(tx, 2) 1774 if err != nil { 1775 return err 1776 } 1777 if want := "Eduard"; got != want { 1778 t.Errorf("got %q, want %q", got, want) 1779 } 1780 return nil 1781 1782 }) 1783 if err != nil { 1784 t.Fatal(err) 1785 } 1786 1787 // Roll back a DML statement and confirm that it didn't happen. 1788 var fail = errors.New("fail") 1789 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1790 _, err := tx.Update(ctx, Statement{ 1791 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 1792 }) 1793 if err != nil { 1794 return err 1795 } 1796 return fail 1797 }) 1798 if err != fail { 1799 t.Fatalf("rolling back: got error %v, want the error 'fail'", err) 1800 } 1801 _, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"}) 1802 if got, want := ErrCode(err), codes.NotFound; got != want { 1803 t.Errorf("got %s, want %s", got, want) 1804 } 1805 1806 // Run two DML statements in the same transaction. 1807 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1808 _, err := tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`}) 1809 if err != nil { 1810 return err 1811 } 1812 _, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`}) 1813 if err != nil { 1814 return err 1815 } 1816 return nil 1817 }) 1818 if err != nil { 1819 t.Fatal(err) 1820 } 1821 got := readFirstNames(1, 2) 1822 want := []string{"Oum", "Eddie"} 1823 if !testEqual(got, want) { 1824 t.Errorf("got %v, want %v", got, want) 1825 } 1826 1827 // Run a DML statement and an ordinary mutation in the same transaction. 1828 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1829 _, err := tx.Update(ctx, Statement{ 1830 SQL: `INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`, 1831 }) 1832 if err != nil { 1833 return err 1834 } 1835 tx.BufferWrite([]*Mutation{ 1836 Insert("Singers", []string{"SingerId", "FirstName", "LastName"}, 1837 []interface{}{4, "Andy", "Irvine"}), 1838 }) 1839 return nil 1840 }) 1841 if err != nil { 1842 t.Fatal(err) 1843 } 1844 got = readFirstNames(3, 4) 1845 want = []string{"Audra", "Andy"} 1846 if !testEqual(got, want) { 1847 t.Errorf("got %v, want %v", got, want) 1848 } 1849 1850 // Attempt to run a query using update. 1851 _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { 1852 _, err := tx.Update(ctx, Statement{SQL: `SELECT FirstName from Singers`}) 1853 return err 1854 }) 1855 if got, want := ErrCode(err), codes.InvalidArgument; got != want { 1856 t.Errorf("got %s, want %s", got, want) 1857 } 1858} 1859 1860func TestIntegration_StructParametersBind(t *testing.T) { 1861 t.Parallel() 1862 ctx := context.Background() 1863 client, _, cleanup := prepareIntegrationTest(ctx, t, nil) 1864 defer cleanup() 1865 1866 type tRow []interface{} 1867 type tRows []struct{ trow tRow } 1868 1869 type allFields struct { 1870 Stringf string 1871 Intf int 1872 Boolf bool 1873 Floatf float64 1874 Bytef []byte 1875 Timef time.Time 1876 Datef civil.Date 1877 } 1878 allColumns := []string{ 1879 "Stringf", 1880 "Intf", 1881 "Boolf", 1882 "Floatf", 1883 "Bytef", 1884 "Timef", 1885 "Datef", 1886 } 1887 s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1} 1888 s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2} 1889 1890 dynamicStructType := reflect.StructOf([]reflect.StructField{ 1891 {Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`}, 1892 }) 1893 s3 := reflect.New(dynamicStructType) 1894 s3.Elem().Field(0).Set(reflect.ValueOf(t1)) 1895 1896 for i, test := range []struct { 1897 param interface{} 1898 sql string 1899 cols []string 1900 trows tRows 1901 }{ 1902 // Struct value. 1903 { 1904 s1, 1905 "SELECT" + 1906 " @p.Stringf," + 1907 " @p.Intf," + 1908 " @p.Boolf," + 1909 " @p.Floatf," + 1910 " @p.Bytef," + 1911 " @p.Timef," + 1912 " @p.Datef", 1913 allColumns, 1914 tRows{ 1915 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 1916 }, 1917 }, 1918 // Array of struct value. 1919 { 1920 []allFields{s1, s2}, 1921 "SELECT * FROM UNNEST(@p)", 1922 allColumns, 1923 tRows{ 1924 {tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}}, 1925 {tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}}, 1926 }, 1927 }, 1928 // Null struct. 1929 { 1930 (*allFields)(nil), 1931 "SELECT @p IS NULL", 1932 []string{""}, 1933 tRows{ 1934 {tRow{true}}, 1935 }, 1936 }, 1937 // Null Array of struct. 1938 { 1939 []allFields(nil), 1940 "SELECT @p IS NULL", 1941 []string{""}, 1942 tRows{ 1943 {tRow{true}}, 1944 }, 1945 }, 1946 // Empty struct. 1947 { 1948 struct{}{}, 1949 "SELECT @p IS NULL ", 1950 []string{""}, 1951 tRows{ 1952 {tRow{false}}, 1953 }, 1954 }, 1955 // Empty array of struct. 1956 { 1957 []allFields{}, 1958 "SELECT * FROM UNNEST(@p) ", 1959 allColumns, 1960 tRows{}, 1961 }, 1962 // Struct with duplicate fields. 1963 { 1964 struct { 1965 A int `spanner:"field"` 1966 B int `spanner:"field"` 1967 }{10, 20}, 1968 "SELECT * FROM UNNEST([@p]) ", 1969 []string{"field", "field"}, 1970 tRows{ 1971 {tRow{10, 20}}, 1972 }, 1973 }, 1974 // Struct with unnamed fields. 1975 { 1976 struct { 1977 A string `spanner:""` 1978 }{"hello"}, 1979 "SELECT * FROM UNNEST([@p]) ", 1980 []string{""}, 1981 tRows{ 1982 {tRow{"hello"}}, 1983 }, 1984 }, 1985 // Mixed struct. 1986 { 1987 struct { 1988 DynamicStructField interface{} `spanner:"f1"` 1989 ArrayStructField []*allFields `spanner:"f2"` 1990 }{ 1991 DynamicStructField: s3.Interface(), 1992 ArrayStructField: []*allFields{nil}, 1993 }, 1994 "SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ", 1995 []string{"ff1", "", ""}, 1996 tRows{ 1997 {tRow{t1, 1, true}}, 1998 }, 1999 }, 2000 } { 2001 iter := client.Single().Query(ctx, Statement{ 2002 SQL: test.sql, 2003 Params: map[string]interface{}{"p": test.param}, 2004 }) 2005 var gotRows []*Row 2006 err := iter.Do(func(r *Row) error { 2007 gotRows = append(gotRows, r) 2008 return nil 2009 }) 2010 if err != nil { 2011 t.Errorf("Failed to execute test case %d, error: %v", i, err) 2012 } 2013 2014 var wantRows []*Row 2015 for j, row := range test.trows { 2016 r, err := NewRow(test.cols, row.trow) 2017 if err != nil { 2018 t.Errorf("Invalid row %d in test case %d", j, i) 2019 } 2020 wantRows = append(wantRows, r) 2021 } 2022 if !testEqual(gotRows, wantRows) { 2023 t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows) 2024 } 2025 } 2026} 2027 2028func TestIntegration_PDML(t *testing.T) { 2029 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2030 defer cancel() 2031 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 2032 defer cleanup() 2033 2034 columns := []string{"SingerId", "FirstName", "LastName"} 2035 2036 // Populate the Singers table. 2037 var muts []*Mutation 2038 for _, row := range [][]interface{}{ 2039 {1, "Umm", "Kulthum"}, 2040 {2, "Eduard", "Khil"}, 2041 {3, "Audra", "McDonald"}, 2042 } { 2043 muts = append(muts, Insert("Singers", columns, row)) 2044 } 2045 if _, err := client.Apply(ctx, muts); err != nil { 2046 t.Fatal(err) 2047 } 2048 // Identifiers in PDML statements must be fully qualified. 2049 // TODO(jba): revisit the above. 2050 count, err := client.PartitionedUpdate(ctx, Statement{ 2051 SQL: `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= 3`, 2052 }) 2053 if err != nil { 2054 t.Fatal(err) 2055 } 2056 if want := int64(3); count != want { 2057 t.Errorf("got %d, want %d", count, want) 2058 } 2059 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2060 if err != nil { 2061 t.Fatal(err) 2062 } 2063 want := [][]interface{}{ 2064 {int64(1), "changed", "Kulthum"}, 2065 {int64(2), "changed", "Khil"}, 2066 {int64(3), "changed", "McDonald"}, 2067 } 2068 if !testEqual(got, want) { 2069 t.Errorf("\ngot %v\nwant%v", got, want) 2070 } 2071} 2072 2073func TestBatchDML(t *testing.T) { 2074 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2075 defer cancel() 2076 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 2077 defer cleanup() 2078 2079 columns := []string{"SingerId", "FirstName", "LastName"} 2080 2081 // Populate the Singers table. 2082 var muts []*Mutation 2083 for _, row := range [][]interface{}{ 2084 {1, "Umm", "Kulthum"}, 2085 {2, "Eduard", "Khil"}, 2086 {3, "Audra", "McDonald"}, 2087 } { 2088 muts = append(muts, Insert("Singers", columns, row)) 2089 } 2090 if _, err := client.Apply(ctx, muts); err != nil { 2091 t.Fatal(err) 2092 } 2093 2094 var counts []int64 2095 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2096 counts, err = tx.BatchUpdate(ctx, []Statement{ 2097 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2098 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2099 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2100 }) 2101 return err 2102 }) 2103 2104 if err != nil { 2105 t.Fatal(err) 2106 } 2107 if want := []int64{1, 1, 1}; !testEqual(counts, want) { 2108 t.Fatalf("got %d, want %d", counts, want) 2109 } 2110 got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns)) 2111 if err != nil { 2112 t.Fatal(err) 2113 } 2114 want := [][]interface{}{ 2115 {int64(1), "changed 1", "Kulthum"}, 2116 {int64(2), "changed 2", "Khil"}, 2117 {int64(3), "changed 3", "McDonald"}, 2118 } 2119 if !testEqual(got, want) { 2120 t.Errorf("\ngot %v\nwant%v", got, want) 2121 } 2122} 2123 2124func TestBatchDML_NoStatements(t *testing.T) { 2125 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2126 defer cancel() 2127 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 2128 defer cleanup() 2129 2130 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2131 _, err = tx.BatchUpdate(ctx, []Statement{}) 2132 return err 2133 }) 2134 if err == nil { 2135 t.Fatal("expected error, got nil") 2136 } 2137 if s, ok := status.FromError(err); ok { 2138 if s.Code() != codes.InvalidArgument { 2139 t.Fatalf("expected InvalidArgument, got %v", err) 2140 } 2141 } else { 2142 t.Fatalf("expected InvalidArgument, got %v", err) 2143 } 2144} 2145 2146func TestBatchDML_TwoStatements(t *testing.T) { 2147 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2148 defer cancel() 2149 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 2150 defer cleanup() 2151 2152 columns := []string{"SingerId", "FirstName", "LastName"} 2153 2154 // Populate the Singers table. 2155 var muts []*Mutation 2156 for _, row := range [][]interface{}{ 2157 {1, "Umm", "Kulthum"}, 2158 {2, "Eduard", "Khil"}, 2159 {3, "Audra", "McDonald"}, 2160 } { 2161 muts = append(muts, Insert("Singers", columns, row)) 2162 } 2163 if _, err := client.Apply(ctx, muts); err != nil { 2164 t.Fatal(err) 2165 } 2166 2167 var updateCount int64 2168 var batchCounts []int64 2169 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2170 batchCounts, err = tx.BatchUpdate(ctx, []Statement{ 2171 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2172 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`}, 2173 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2174 }) 2175 if err != nil { 2176 return err 2177 } 2178 2179 updateCount, err = tx.Update(ctx, Statement{SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}) 2180 return err 2181 }) 2182 if err != nil { 2183 t.Fatal(err) 2184 } 2185 if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) { 2186 t.Fatalf("got %d, want %d", batchCounts, want) 2187 } 2188 if updateCount != 1 { 2189 t.Fatalf("got %v, want 1", updateCount) 2190 } 2191} 2192 2193// TODO(deklerk) this currently does not work because the transaction appears to 2194// get rolled back after a single statement fails. b/120158761 2195func TestBatchDML_Error(t *testing.T) { 2196 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 2197 defer cancel() 2198 client, _, cleanup := prepareIntegrationTest(ctx, t, singerDBStatements) 2199 defer cleanup() 2200 2201 columns := []string{"SingerId", "FirstName", "LastName"} 2202 2203 // Populate the Singers table. 2204 var muts []*Mutation 2205 for _, row := range [][]interface{}{ 2206 {1, "Umm", "Kulthum"}, 2207 {2, "Eduard", "Khil"}, 2208 {3, "Audra", "McDonald"}, 2209 } { 2210 muts = append(muts, Insert("Singers", columns, row)) 2211 } 2212 if _, err := client.Apply(ctx, muts); err != nil { 2213 t.Fatal(err) 2214 } 2215 2216 _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) { 2217 counts, err := tx.BatchUpdate(ctx, []Statement{ 2218 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}, 2219 {SQL: `some illegal statement`}, 2220 {SQL: `UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}, 2221 }) 2222 if err == nil { 2223 t.Fatal("expected err, got nil") 2224 } 2225 if want := []int64{1}; !testEqual(counts, want) { 2226 t.Fatalf("got %d, want %d", counts, want) 2227 } 2228 2229 got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns)) 2230 if err != nil { 2231 t.Fatal(err) 2232 } 2233 want := [][]interface{}{ 2234 {int64(1), "changed 1", "Kulthum"}, 2235 {int64(2), "Eduard", "Khil"}, 2236 {int64(3), "Audra", "McDonald"}, 2237 } 2238 if !testEqual(got, want) { 2239 t.Errorf("\ngot %v\nwant%v", got, want) 2240 } 2241 2242 return nil 2243 }) 2244 if err != nil { 2245 t.Fatal(err) 2246 } 2247} 2248 2249// Prepare initializes Cloud Spanner testing DB and clients. 2250func prepareIntegrationTest(ctx context.Context, t *testing.T, statements []string) (*Client, string, func()) { 2251 if admin == nil { 2252 t.Skip("Integration tests skipped") 2253 } 2254 // Construct a unique test DB name. 2255 dbName := dbNameSpace.New() 2256 2257 dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName) 2258 // Create database and tables. 2259 op, err := admin.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{ 2260 Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID), 2261 CreateStatement: "CREATE DATABASE " + dbName, 2262 ExtraStatements: statements, 2263 }) 2264 if err != nil { 2265 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 2266 } 2267 if _, err := op.Wait(ctx); err != nil { 2268 t.Fatalf("cannot create testing DB %v: %v", dbPath, err) 2269 } 2270 client, err := createClient(ctx, dbPath) 2271 if err != nil { 2272 t.Fatalf("cannot create data client on DB %v: %v", dbPath, err) 2273 } 2274 return client, dbPath, func() { 2275 client.Close() 2276 if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil { 2277 t.Logf("failed to drop database %s (error %v), might need a manual removal", 2278 dbPath, err) 2279 } 2280 } 2281} 2282 2283func cleanupDatabases() { 2284 if admin == nil { 2285 // Integration tests skipped. 2286 return 2287 } 2288 2289 ctx := context.Background() 2290 dbsParent := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID) 2291 dbsIter := admin.ListDatabases(ctx, &adminpb.ListDatabasesRequest{Parent: dbsParent}) 2292 expireAge := 24 * time.Hour 2293 2294 for { 2295 db, err := dbsIter.Next() 2296 if err == iterator.Done { 2297 break 2298 } 2299 if err != nil { 2300 panic(err) 2301 } 2302 // TODO(deklerk) When we have the ability to programmatically create 2303 // instances, we can create an instance with uid.New and delete all 2304 // tables in it. For now, we rely on matching prefixes. 2305 if dbNameSpace.Older(db.Name, expireAge) { 2306 log.Printf("Dropping database %s", db.Name) 2307 2308 if err := admin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: db.Name}); err != nil { 2309 log.Printf("failed to drop database %s (error %v), might need a manual removal", 2310 db.Name, err) 2311 } 2312 } 2313 } 2314} 2315 2316func rangeReads(ctx context.Context, t *testing.T, client *Client) { 2317 checkRange := func(ks KeySet, wantNums ...int) { 2318 if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok { 2319 t.Errorf("key set %+v: %s", ks, msg) 2320 } 2321 } 2322 2323 checkRange(Key{"k1"}, 1) 2324 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4) 2325 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5) 2326 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5) 2327 checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4) 2328 2329 // Partial key specification. 2330 checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9) 2331 checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9) 2332 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10) 2333 checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11) 2334 2335 // The following produce empty ranges. 2336 // TODO(jba): Consider a multi-part key to illustrate partial key behavior. 2337 // checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen}) 2338 // checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen}) 2339 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen}) 2340 // checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed}) 2341 2342 // Prefix is component-wise, not string prefix. 2343 checkRange(Key{"k1"}.AsPrefix(), 1) 2344 checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 2345 2346 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 2347} 2348 2349func indexRangeReads(ctx context.Context, t *testing.T, client *Client) { 2350 checkRange := func(ks KeySet, wantNums ...int) { 2351 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns), 2352 wantNums); !ok { 2353 t.Errorf("key set %+v: %s", ks, msg) 2354 } 2355 } 2356 2357 checkRange(Key{"v1"}, 1) 2358 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4) 2359 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5) 2360 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5) 2361 checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4) 2362 2363 // // Partial key specification. 2364 checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9) 2365 checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9) 2366 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10) 2367 checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11) 2368 2369 // // The following produce empty ranges. 2370 // checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen}) 2371 // checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen}) 2372 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen}) 2373 // checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed}) 2374 2375 // // Prefix is component-wise, not string prefix. 2376 checkRange(Key{"v1"}.AsPrefix(), 1) 2377 checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14) 2378 checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14) 2379 2380 // Read from an index with DESC ordering. 2381 wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0} 2382 if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns), 2383 wantNums); !ok { 2384 t.Errorf("desc: %s", msg) 2385 } 2386} 2387 2388type testTableRow struct{ Key, StringValue string } 2389 2390func compareRows(iter *RowIterator, wantNums []int) (string, bool) { 2391 rows, err := readAllTestTable(iter) 2392 if err != nil { 2393 return err.Error(), false 2394 } 2395 want := map[string]string{} 2396 for _, n := range wantNums { 2397 want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n) 2398 } 2399 got := map[string]string{} 2400 for _, r := range rows { 2401 got[r.Key] = r.StringValue 2402 } 2403 if !testEqual(got, want) { 2404 return fmt.Sprintf("got %v, want %v", got, want), false 2405 } 2406 return "", true 2407} 2408 2409func isNaN(x interface{}) bool { 2410 f, ok := x.(float64) 2411 if !ok { 2412 return false 2413 } 2414 return math.IsNaN(f) 2415} 2416 2417// createClient creates Cloud Spanner data client. 2418func createClient(ctx context.Context, dbPath string) (client *Client, err error) { 2419 client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{ 2420 SessionPoolConfig: SessionPoolConfig{WriteSessions: 0.2}, 2421 }, option.WithTokenSource(testutil.TokenSource(ctx, Scope)), option.WithEndpoint(endpoint)) 2422 if err != nil { 2423 return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err) 2424 } 2425 return client, nil 2426} 2427 2428// populate prepares the database with some data. 2429func populate(ctx context.Context, client *Client) error { 2430 // Populate data 2431 var err error 2432 m := InsertMap("test", map[string]interface{}{ 2433 "a": str1, 2434 "b": str2, 2435 }) 2436 _, err = client.Apply(ctx, []*Mutation{m}) 2437 return err 2438} 2439 2440func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) { 2441 if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) { 2442 return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false 2443 } 2444 return "", true 2445} 2446 2447func rowToValues(r *Row) ([]interface{}, error) { 2448 var x int64 2449 var y, z string 2450 if err := r.Column(0, &x); err != nil { 2451 return nil, err 2452 } 2453 if err := r.Column(1, &y); err != nil { 2454 return nil, err 2455 } 2456 if err := r.Column(2, &z); err != nil { 2457 return nil, err 2458 } 2459 return []interface{}{x, y, z}, nil 2460} 2461 2462func readAll(iter *RowIterator) ([][]interface{}, error) { 2463 defer iter.Stop() 2464 var vals [][]interface{} 2465 for { 2466 row, err := iter.Next() 2467 if err == iterator.Done { 2468 return vals, nil 2469 } 2470 if err != nil { 2471 return nil, err 2472 } 2473 v, err := rowToValues(row) 2474 if err != nil { 2475 return nil, err 2476 } 2477 vals = append(vals, v) 2478 } 2479} 2480 2481func readAllTestTable(iter *RowIterator) ([]testTableRow, error) { 2482 defer iter.Stop() 2483 var vals []testTableRow 2484 for { 2485 row, err := iter.Next() 2486 if err == iterator.Done { 2487 return vals, nil 2488 } 2489 if err != nil { 2490 return nil, err 2491 } 2492 var ttr testTableRow 2493 if err := row.ToStruct(&ttr); err != nil { 2494 return nil, err 2495 } 2496 vals = append(vals, ttr) 2497 } 2498} 2499