1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "encoding/json" 19 "errors" 20 "flag" 21 "fmt" 22 "log" 23 "math/big" 24 "net/http" 25 "os" 26 "sort" 27 "strings" 28 "testing" 29 "time" 30 31 "github.com/google/go-cmp/cmp" 32 "github.com/google/go-cmp/cmp/cmpopts" 33 gax "github.com/googleapis/gax-go" 34 35 "cloud.google.com/go/civil" 36 "cloud.google.com/go/httpreplay" 37 "cloud.google.com/go/internal" 38 "cloud.google.com/go/internal/pretty" 39 "cloud.google.com/go/internal/testutil" 40 "cloud.google.com/go/internal/uid" 41 "cloud.google.com/go/storage" 42 "golang.org/x/net/context" 43 "google.golang.org/api/googleapi" 44 "google.golang.org/api/iterator" 45 "google.golang.org/api/option" 46) 47 48const replayFilename = "bigquery.replay" 49 50var record = flag.Bool("record", false, "record RPCs") 51 52var ( 53 client *Client 54 storageClient *storage.Client 55 dataset *Dataset 56 schema = Schema{ 57 {Name: "name", Type: StringFieldType}, 58 {Name: "nums", Type: IntegerFieldType, Repeated: true}, 59 {Name: "rec", Type: RecordFieldType, Schema: Schema{ 60 {Name: "bool", Type: BooleanFieldType}, 61 }}, 62 } 63 testTableExpiration time.Time 64 datasetIDs, tableIDs *uid.Space 65) 66 67// Note: integration tests cannot be run in parallel, because TestIntegration_Location 68// modifies the client. 69 70func TestMain(m *testing.M) { 71 cleanup := initIntegrationTest() 72 r := m.Run() 73 cleanup() 74 os.Exit(r) 75} 76 77func getClient(t *testing.T) *Client { 78 if client == nil { 79 t.Skip("Integration tests skipped") 80 } 81 return client 82} 83 84// If integration tests will be run, create a unique dataset for them. 85// Return a cleanup function. 86func initIntegrationTest() func() { 87 ctx := context.Background() 88 flag.Parse() // needed for testing.Short() 89 projID := testutil.ProjID() 90 switch { 91 case testing.Short() && *record: 92 log.Fatal("cannot combine -short and -record") 93 return func() {} 94 95 case testing.Short() && httpreplay.Supported() && testutil.CanReplay(replayFilename) && projID != "": 96 // go test -short with a replay file will replay the integration tests if the 97 // environment variables are set. 98 log.Printf("replaying from %s", replayFilename) 99 httpreplay.DebugHeaders() 100 replayer, err := httpreplay.NewReplayer(replayFilename) 101 if err != nil { 102 log.Fatal(err) 103 } 104 var t time.Time 105 if err := json.Unmarshal(replayer.Initial(), &t); err != nil { 106 log.Fatal(err) 107 } 108 hc, err := replayer.Client(ctx) // no creds needed 109 if err != nil { 110 log.Fatal(err) 111 } 112 client, err = NewClient(ctx, projID, option.WithHTTPClient(hc)) 113 if err != nil { 114 log.Fatal(err) 115 } 116 storageClient, err = storage.NewClient(ctx, option.WithHTTPClient(hc)) 117 if err != nil { 118 log.Fatal(err) 119 } 120 cleanup := initTestState(client, t) 121 return func() { 122 cleanup() 123 _ = replayer.Close() // No actionable error returned. 124 } 125 126 case testing.Short(): 127 // go test -short without a replay file skips the integration tests. 128 if testutil.CanReplay(replayFilename) && projID != "" { 129 log.Print("replay not supported for Go versions before 1.8") 130 } 131 client = nil 132 storageClient = nil 133 return func() {} 134 135 default: // Run integration tests against a real backend. 136 ts := testutil.TokenSource(ctx, Scope) 137 if ts == nil { 138 log.Println("Integration tests skipped. See CONTRIBUTING.md for details") 139 return func() {} 140 } 141 bqOpt := option.WithTokenSource(ts) 142 sOpt := option.WithTokenSource(testutil.TokenSource(ctx, storage.ScopeFullControl)) 143 cleanup := func() {} 144 now := time.Now().UTC() 145 if *record { 146 if !httpreplay.Supported() { 147 log.Print("record not supported for Go versions before 1.8") 148 } else { 149 nowBytes, err := json.Marshal(now) 150 if err != nil { 151 log.Fatal(err) 152 } 153 recorder, err := httpreplay.NewRecorder(replayFilename, nowBytes) 154 if err != nil { 155 log.Fatalf("could not record: %v", err) 156 } 157 log.Printf("recording to %s", replayFilename) 158 hc, err := recorder.Client(ctx, bqOpt) 159 if err != nil { 160 log.Fatal(err) 161 } 162 bqOpt = option.WithHTTPClient(hc) 163 hc, err = recorder.Client(ctx, sOpt) 164 if err != nil { 165 log.Fatal(err) 166 } 167 sOpt = option.WithHTTPClient(hc) 168 cleanup = func() { 169 if err := recorder.Close(); err != nil { 170 log.Printf("saving recording: %v", err) 171 } 172 } 173 } 174 } 175 var err error 176 client, err = NewClient(ctx, projID, bqOpt) 177 if err != nil { 178 log.Fatalf("NewClient: %v", err) 179 } 180 storageClient, err = storage.NewClient(ctx, sOpt) 181 if err != nil { 182 log.Fatalf("storage.NewClient: %v", err) 183 } 184 c := initTestState(client, now) 185 return func() { c(); cleanup() } 186 } 187} 188 189func initTestState(client *Client, t time.Time) func() { 190 // BigQuery does not accept hyphens in dataset or table IDs, so we create IDs 191 // with underscores. 192 ctx := context.Background() 193 opts := &uid.Options{Sep: '_', Time: t} 194 datasetIDs = uid.NewSpace("dataset", opts) 195 tableIDs = uid.NewSpace("table", opts) 196 testTableExpiration = t.Add(10 * time.Minute).Round(time.Second) 197 // For replayability, seed the random source with t. 198 Seed(t.UnixNano()) 199 200 dataset = client.Dataset(datasetIDs.New()) 201 if err := dataset.Create(ctx, nil); err != nil { 202 log.Fatalf("creating dataset %s: %v", dataset.DatasetID, err) 203 } 204 return func() { 205 if err := dataset.DeleteWithContents(ctx); err != nil { 206 log.Printf("could not delete %s", dataset.DatasetID) 207 } 208 } 209} 210 211func TestIntegration_TableCreate(t *testing.T) { 212 // Check that creating a record field with an empty schema is an error. 213 if client == nil { 214 t.Skip("Integration tests skipped") 215 } 216 table := dataset.Table("t_bad") 217 schema := Schema{ 218 {Name: "rec", Type: RecordFieldType, Schema: Schema{}}, 219 } 220 err := table.Create(context.Background(), &TableMetadata{ 221 Schema: schema, 222 ExpirationTime: testTableExpiration.Add(5 * time.Minute), 223 }) 224 if err == nil { 225 t.Fatal("want error, got nil") 226 } 227 if !hasStatusCode(err, http.StatusBadRequest) { 228 t.Fatalf("want a 400 error, got %v", err) 229 } 230} 231 232func TestIntegration_TableCreateView(t *testing.T) { 233 if client == nil { 234 t.Skip("Integration tests skipped") 235 } 236 ctx := context.Background() 237 table := newTable(t, schema) 238 defer table.Delete(ctx) 239 240 // Test that standard SQL views work. 241 view := dataset.Table("t_view_standardsql") 242 query := fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM `%s.%s.%s`", 243 dataset.ProjectID, dataset.DatasetID, table.TableID) 244 err := view.Create(context.Background(), &TableMetadata{ 245 ViewQuery: query, 246 UseStandardSQL: true, 247 }) 248 if err != nil { 249 t.Fatalf("table.create: Did not expect an error, got: %v", err) 250 } 251 if err := view.Delete(ctx); err != nil { 252 t.Fatal(err) 253 } 254} 255 256func TestIntegration_TableMetadata(t *testing.T) { 257 if client == nil { 258 t.Skip("Integration tests skipped") 259 } 260 ctx := context.Background() 261 table := newTable(t, schema) 262 defer table.Delete(ctx) 263 // Check table metadata. 264 md, err := table.Metadata(ctx) 265 if err != nil { 266 t.Fatal(err) 267 } 268 // TODO(jba): check md more thorougly. 269 if got, want := md.FullID, fmt.Sprintf("%s:%s.%s", dataset.ProjectID, dataset.DatasetID, table.TableID); got != want { 270 t.Errorf("metadata.FullID: got %q, want %q", got, want) 271 } 272 if got, want := md.Type, RegularTable; got != want { 273 t.Errorf("metadata.Type: got %v, want %v", got, want) 274 } 275 if got, want := md.ExpirationTime, testTableExpiration; !got.Equal(want) { 276 t.Errorf("metadata.Type: got %v, want %v", got, want) 277 } 278 279 // Check that timePartitioning is nil by default 280 if md.TimePartitioning != nil { 281 t.Errorf("metadata.TimePartitioning: got %v, want %v", md.TimePartitioning, nil) 282 } 283 284 // Create tables that have time partitioning 285 partitionCases := []struct { 286 timePartitioning TimePartitioning 287 wantExpiration time.Duration 288 wantField string 289 }{ 290 {TimePartitioning{}, time.Duration(0), ""}, 291 {TimePartitioning{Expiration: time.Second}, time.Second, ""}, 292 { 293 TimePartitioning{ 294 Expiration: time.Second, 295 Field: "date", 296 }, time.Second, "date"}, 297 } 298 299 schema2 := Schema{ 300 {Name: "name", Type: StringFieldType}, 301 {Name: "date", Type: DateFieldType}, 302 } 303 304 clustering := &Clustering{ 305 Fields: []string{"name"}, 306 } 307 308 // Currently, clustering depends on partitioning. Interleave testing of the two features. 309 for i, c := range partitionCases { 310 table := dataset.Table(fmt.Sprintf("t_metadata_partition_nocluster_%v", i)) 311 clusterTable := dataset.Table(fmt.Sprintf("t_metadata_partition_cluster_%v", i)) 312 313 // Create unclustered, partitioned variant and get metadata. 314 err = table.Create(context.Background(), &TableMetadata{ 315 Schema: schema2, 316 TimePartitioning: &c.timePartitioning, 317 ExpirationTime: testTableExpiration, 318 }) 319 if err != nil { 320 t.Fatal(err) 321 } 322 defer table.Delete(ctx) 323 md, err := table.Metadata(ctx) 324 if err != nil { 325 t.Fatal(err) 326 } 327 328 // Created clustered table and get metadata. 329 err = clusterTable.Create(context.Background(), &TableMetadata{ 330 Schema: schema2, 331 TimePartitioning: &c.timePartitioning, 332 ExpirationTime: testTableExpiration, 333 Clustering: clustering, 334 }) 335 clusterMD, err := clusterTable.Metadata(ctx) 336 if err != nil { 337 t.Fatal(err) 338 } 339 340 for _, v := range []*TableMetadata{md, clusterMD} { 341 got := v.TimePartitioning 342 want := &TimePartitioning{ 343 Expiration: c.wantExpiration, 344 Field: c.wantField, 345 } 346 if !testutil.Equal(got, want) { 347 t.Errorf("metadata.TimePartitioning: got %v, want %v", got, want) 348 } 349 } 350 351 if md.Clustering != nil { 352 t.Errorf("metadata.Clustering was not nil on unclustered table %s", table.TableID) 353 } 354 got := clusterMD.Clustering 355 want := clustering 356 if clusterMD.Clustering != clustering { 357 if !testutil.Equal(got, want) { 358 t.Errorf("metadata.Clustering: got %v, want %v", got, want) 359 } 360 } 361 } 362 363} 364 365func TestIntegration_DatasetCreate(t *testing.T) { 366 if client == nil { 367 t.Skip("Integration tests skipped") 368 } 369 ctx := context.Background() 370 ds := client.Dataset(datasetIDs.New()) 371 wmd := &DatasetMetadata{Name: "name", Location: "EU"} 372 err := ds.Create(ctx, wmd) 373 if err != nil { 374 t.Fatal(err) 375 } 376 gmd, err := ds.Metadata(ctx) 377 if err != nil { 378 t.Fatal(err) 379 } 380 if got, want := gmd.Name, wmd.Name; got != want { 381 t.Errorf("name: got %q, want %q", got, want) 382 } 383 if got, want := gmd.Location, wmd.Location; got != want { 384 t.Errorf("location: got %q, want %q", got, want) 385 } 386 if err := ds.Delete(ctx); err != nil { 387 t.Fatalf("deleting dataset %v: %v", ds, err) 388 } 389} 390 391func TestIntegration_DatasetMetadata(t *testing.T) { 392 if client == nil { 393 t.Skip("Integration tests skipped") 394 } 395 ctx := context.Background() 396 md, err := dataset.Metadata(ctx) 397 if err != nil { 398 t.Fatal(err) 399 } 400 if got, want := md.FullID, fmt.Sprintf("%s:%s", dataset.ProjectID, dataset.DatasetID); got != want { 401 t.Errorf("FullID: got %q, want %q", got, want) 402 } 403 jan2016 := time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC) 404 if md.CreationTime.Before(jan2016) { 405 t.Errorf("CreationTime: got %s, want > 2016-1-1", md.CreationTime) 406 } 407 if md.LastModifiedTime.Before(jan2016) { 408 t.Errorf("LastModifiedTime: got %s, want > 2016-1-1", md.LastModifiedTime) 409 } 410 411 // Verify that we get a NotFound for a nonexistent dataset. 412 _, err = client.Dataset("does_not_exist").Metadata(ctx) 413 if err == nil || !hasStatusCode(err, http.StatusNotFound) { 414 t.Errorf("got %v, want NotFound error", err) 415 } 416} 417 418func TestIntegration_DatasetDelete(t *testing.T) { 419 if client == nil { 420 t.Skip("Integration tests skipped") 421 } 422 ctx := context.Background() 423 ds := client.Dataset(datasetIDs.New()) 424 if err := ds.Create(ctx, nil); err != nil { 425 t.Fatalf("creating dataset %s: %v", ds.DatasetID, err) 426 } 427 if err := ds.Delete(ctx); err != nil { 428 t.Fatalf("deleting dataset %s: %v", ds.DatasetID, err) 429 } 430} 431 432func TestIntegration_DatasetDeleteWithContents(t *testing.T) { 433 if client == nil { 434 t.Skip("Integration tests skipped") 435 } 436 ctx := context.Background() 437 ds := client.Dataset(datasetIDs.New()) 438 if err := ds.Create(ctx, nil); err != nil { 439 t.Fatalf("creating dataset %s: %v", ds.DatasetID, err) 440 } 441 table := ds.Table(tableIDs.New()) 442 if err := table.Create(ctx, nil); err != nil { 443 t.Fatalf("creating table %s in dataset %s: %v", table.TableID, table.DatasetID, err) 444 } 445 // We expect failure here 446 if err := ds.Delete(ctx); err == nil { 447 t.Fatalf("non-recursive delete of dataset %s succeeded unexpectedly.", ds.DatasetID) 448 } 449 if err := ds.DeleteWithContents(ctx); err != nil { 450 t.Fatalf("deleting recursively dataset %s: %v", ds.DatasetID, err) 451 } 452} 453 454func TestIntegration_DatasetUpdateETags(t *testing.T) { 455 if client == nil { 456 t.Skip("Integration tests skipped") 457 } 458 459 check := func(md *DatasetMetadata, wantDesc, wantName string) { 460 if md.Description != wantDesc { 461 t.Errorf("description: got %q, want %q", md.Description, wantDesc) 462 } 463 if md.Name != wantName { 464 t.Errorf("name: got %q, want %q", md.Name, wantName) 465 } 466 } 467 468 ctx := context.Background() 469 md, err := dataset.Metadata(ctx) 470 if err != nil { 471 t.Fatal(err) 472 } 473 if md.ETag == "" { 474 t.Fatal("empty ETag") 475 } 476 // Write without ETag succeeds. 477 desc := md.Description + "d2" 478 name := md.Name + "n2" 479 md2, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: desc, Name: name}, "") 480 if err != nil { 481 t.Fatal(err) 482 } 483 check(md2, desc, name) 484 485 // Write with original ETag fails because of intervening write. 486 _, err = dataset.Update(ctx, DatasetMetadataToUpdate{Description: "d", Name: "n"}, md.ETag) 487 if err == nil { 488 t.Fatal("got nil, want error") 489 } 490 491 // Write with most recent ETag succeeds. 492 md3, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: "", Name: ""}, md2.ETag) 493 if err != nil { 494 t.Fatal(err) 495 } 496 check(md3, "", "") 497} 498 499func TestIntegration_DatasetUpdateDefaultExpiration(t *testing.T) { 500 if client == nil { 501 t.Skip("Integration tests skipped") 502 } 503 ctx := context.Background() 504 md, err := dataset.Metadata(ctx) 505 if err != nil { 506 t.Fatal(err) 507 } 508 // Set the default expiration time. 509 md, err = dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Hour}, "") 510 if err != nil { 511 t.Fatal(err) 512 } 513 if md.DefaultTableExpiration != time.Hour { 514 t.Fatalf("got %s, want 1h", md.DefaultTableExpiration) 515 } 516 // Omitting DefaultTableExpiration doesn't change it. 517 md, err = dataset.Update(ctx, DatasetMetadataToUpdate{Name: "xyz"}, "") 518 if err != nil { 519 t.Fatal(err) 520 } 521 if md.DefaultTableExpiration != time.Hour { 522 t.Fatalf("got %s, want 1h", md.DefaultTableExpiration) 523 } 524 // Setting it to 0 deletes it (which looks like a 0 duration). 525 md, err = dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Duration(0)}, "") 526 if err != nil { 527 t.Fatal(err) 528 } 529 if md.DefaultTableExpiration != 0 { 530 t.Fatalf("got %s, want 0", md.DefaultTableExpiration) 531 } 532} 533 534func TestIntegration_DatasetUpdateAccess(t *testing.T) { 535 if client == nil { 536 t.Skip("Integration tests skipped") 537 } 538 ctx := context.Background() 539 md, err := dataset.Metadata(ctx) 540 if err != nil { 541 t.Fatal(err) 542 } 543 origAccess := append([]*AccessEntry(nil), md.Access...) 544 newEntry := &AccessEntry{ 545 Role: ReaderRole, 546 Entity: "Joe@example.com", 547 EntityType: UserEmailEntity, 548 } 549 newAccess := append(md.Access, newEntry) 550 dm := DatasetMetadataToUpdate{Access: newAccess} 551 md, err = dataset.Update(ctx, dm, md.ETag) 552 if err != nil { 553 t.Fatal(err) 554 } 555 defer func() { 556 _, err := dataset.Update(ctx, DatasetMetadataToUpdate{Access: origAccess}, md.ETag) 557 if err != nil { 558 t.Log("could not restore dataset access list") 559 } 560 }() 561 if diff := testutil.Diff(md.Access, newAccess); diff != "" { 562 t.Fatalf("got=-, want=+:\n%s", diff) 563 } 564} 565 566func TestIntegration_DatasetUpdateLabels(t *testing.T) { 567 if client == nil { 568 t.Skip("Integration tests skipped") 569 } 570 ctx := context.Background() 571 md, err := dataset.Metadata(ctx) 572 if err != nil { 573 t.Fatal(err) 574 } 575 var dm DatasetMetadataToUpdate 576 dm.SetLabel("label", "value") 577 md, err = dataset.Update(ctx, dm, "") 578 if err != nil { 579 t.Fatal(err) 580 } 581 if got, want := md.Labels["label"], "value"; got != want { 582 t.Errorf("got %q, want %q", got, want) 583 } 584 dm = DatasetMetadataToUpdate{} 585 dm.DeleteLabel("label") 586 md, err = dataset.Update(ctx, dm, "") 587 if err != nil { 588 t.Fatal(err) 589 } 590 if _, ok := md.Labels["label"]; ok { 591 t.Error("label still present after deletion") 592 } 593} 594 595func TestIntegration_TableUpdateLabels(t *testing.T) { 596 if client == nil { 597 t.Skip("Integration tests skipped") 598 } 599 ctx := context.Background() 600 table := newTable(t, schema) 601 defer table.Delete(ctx) 602 603 var tm TableMetadataToUpdate 604 tm.SetLabel("label", "value") 605 md, err := table.Update(ctx, tm, "") 606 if err != nil { 607 t.Fatal(err) 608 } 609 if got, want := md.Labels["label"], "value"; got != want { 610 t.Errorf("got %q, want %q", got, want) 611 } 612 tm = TableMetadataToUpdate{} 613 tm.DeleteLabel("label") 614 md, err = table.Update(ctx, tm, "") 615 if err != nil { 616 t.Fatal(err) 617 } 618 if _, ok := md.Labels["label"]; ok { 619 t.Error("label still present after deletion") 620 } 621} 622 623func TestIntegration_Tables(t *testing.T) { 624 if client == nil { 625 t.Skip("Integration tests skipped") 626 } 627 ctx := context.Background() 628 table := newTable(t, schema) 629 defer table.Delete(ctx) 630 wantName := table.FullyQualifiedName() 631 632 // This test is flaky due to eventual consistency. 633 ctx, cancel := context.WithTimeout(ctx, 10*time.Second) 634 defer cancel() 635 err := internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) { 636 // Iterate over tables in the dataset. 637 it := dataset.Tables(ctx) 638 var tableNames []string 639 for { 640 tbl, err := it.Next() 641 if err == iterator.Done { 642 break 643 } 644 if err != nil { 645 return false, err 646 } 647 tableNames = append(tableNames, tbl.FullyQualifiedName()) 648 } 649 // Other tests may be running with this dataset, so there might be more 650 // than just our table in the list. So don't try for an exact match; just 651 // make sure that our table is there somewhere. 652 for _, tn := range tableNames { 653 if tn == wantName { 654 return true, nil 655 } 656 } 657 return false, fmt.Errorf("got %v\nwant %s in the list", tableNames, wantName) 658 }) 659 if err != nil { 660 t.Fatal(err) 661 } 662} 663 664func TestIntegration_UploadAndRead(t *testing.T) { 665 if client == nil { 666 t.Skip("Integration tests skipped") 667 } 668 ctx := context.Background() 669 table := newTable(t, schema) 670 defer table.Delete(ctx) 671 672 // Populate the table. 673 upl := table.Uploader() 674 var ( 675 wantRows [][]Value 676 saverRows []*ValuesSaver 677 ) 678 for i, name := range []string{"a", "b", "c"} { 679 row := []Value{name, []Value{int64(i)}, []Value{true}} 680 wantRows = append(wantRows, row) 681 saverRows = append(saverRows, &ValuesSaver{ 682 Schema: schema, 683 InsertID: name, 684 Row: row, 685 }) 686 } 687 if err := upl.Put(ctx, saverRows); err != nil { 688 t.Fatal(putError(err)) 689 } 690 691 // Wait until the data has been uploaded. This can take a few seconds, according 692 // to https://cloud.google.com/bigquery/streaming-data-into-bigquery. 693 if err := waitForRow(ctx, table); err != nil { 694 t.Fatal(err) 695 } 696 // Read the table. 697 checkRead(t, "upload", table.Read(ctx), wantRows) 698 699 // Query the table. 700 q := client.Query(fmt.Sprintf("select name, nums, rec from %s", table.TableID)) 701 q.DefaultProjectID = dataset.ProjectID 702 q.DefaultDatasetID = dataset.DatasetID 703 704 rit, err := q.Read(ctx) 705 if err != nil { 706 t.Fatal(err) 707 } 708 checkRead(t, "query", rit, wantRows) 709 710 // Query the long way. 711 job1, err := q.Run(ctx) 712 if err != nil { 713 t.Fatal(err) 714 } 715 if job1.LastStatus() == nil { 716 t.Error("no LastStatus") 717 } 718 job2, err := client.JobFromID(ctx, job1.ID()) 719 if err != nil { 720 t.Fatal(err) 721 } 722 if job2.LastStatus() == nil { 723 t.Error("no LastStatus") 724 } 725 rit, err = job2.Read(ctx) 726 if err != nil { 727 t.Fatal(err) 728 } 729 checkRead(t, "job.Read", rit, wantRows) 730 731 // Get statistics. 732 jobStatus, err := job2.Status(ctx) 733 if err != nil { 734 t.Fatal(err) 735 } 736 if jobStatus.Statistics == nil { 737 t.Fatal("jobStatus missing statistics") 738 } 739 if _, ok := jobStatus.Statistics.Details.(*QueryStatistics); !ok { 740 t.Errorf("expected QueryStatistics, got %T", jobStatus.Statistics.Details) 741 } 742 743 // Test reading directly into a []Value. 744 valueLists, schema, _, err := readAll(table.Read(ctx)) 745 if err != nil { 746 t.Fatal(err) 747 } 748 it := table.Read(ctx) 749 for i, vl := range valueLists { 750 var got []Value 751 if err := it.Next(&got); err != nil { 752 t.Fatal(err) 753 } 754 if !testutil.Equal(it.Schema, schema) { 755 t.Fatalf("got schema %v, want %v", it.Schema, schema) 756 } 757 want := []Value(vl) 758 if !testutil.Equal(got, want) { 759 t.Errorf("%d: got %v, want %v", i, got, want) 760 } 761 } 762 763 // Test reading into a map. 764 it = table.Read(ctx) 765 for _, vl := range valueLists { 766 var vm map[string]Value 767 if err := it.Next(&vm); err != nil { 768 t.Fatal(err) 769 } 770 if got, want := len(vm), len(vl); got != want { 771 t.Fatalf("valueMap len: got %d, want %d", got, want) 772 } 773 // With maps, structs become nested maps. 774 vl[2] = map[string]Value{"bool": vl[2].([]Value)[0]} 775 for i, v := range vl { 776 if got, want := vm[schema[i].Name], v; !testutil.Equal(got, want) { 777 t.Errorf("%d, name=%s: got %#v, want %#v", 778 i, schema[i].Name, got, want) 779 } 780 } 781 } 782 783} 784 785type SubSubTestStruct struct { 786 Integer int64 787} 788 789type SubTestStruct struct { 790 String string 791 Record SubSubTestStruct 792 RecordArray []SubSubTestStruct 793} 794 795type TestStruct struct { 796 Name string 797 Bytes []byte 798 Integer int64 799 Float float64 800 Boolean bool 801 Timestamp time.Time 802 Date civil.Date 803 Time civil.Time 804 DateTime civil.DateTime 805 Numeric *big.Rat 806 807 StringArray []string 808 IntegerArray []int64 809 FloatArray []float64 810 BooleanArray []bool 811 TimestampArray []time.Time 812 DateArray []civil.Date 813 TimeArray []civil.Time 814 DateTimeArray []civil.DateTime 815 NumericArray []*big.Rat 816 817 Record SubTestStruct 818 RecordArray []SubTestStruct 819} 820 821// Round times to the microsecond for comparison purposes. 822var roundToMicros = cmp.Transformer("RoundToMicros", 823 func(t time.Time) time.Time { return t.Round(time.Microsecond) }) 824 825func TestIntegration_UploadAndReadStructs(t *testing.T) { 826 if client == nil { 827 t.Skip("Integration tests skipped") 828 } 829 schema, err := InferSchema(TestStruct{}) 830 if err != nil { 831 t.Fatal(err) 832 } 833 834 ctx := context.Background() 835 table := newTable(t, schema) 836 defer table.Delete(ctx) 837 838 d := civil.Date{Year: 2016, Month: 3, Day: 20} 839 tm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000} 840 ts := time.Date(2016, 3, 20, 15, 4, 5, 6000, time.UTC) 841 dtm := civil.DateTime{Date: d, Time: tm} 842 d2 := civil.Date{Year: 1994, Month: 5, Day: 15} 843 tm2 := civil.Time{Hour: 1, Minute: 2, Second: 4, Nanosecond: 0} 844 ts2 := time.Date(1994, 5, 15, 1, 2, 4, 0, time.UTC) 845 dtm2 := civil.DateTime{Date: d2, Time: tm2} 846 847 // Populate the table. 848 upl := table.Uploader() 849 want := []*TestStruct{ 850 { 851 "a", 852 []byte("byte"), 853 42, 854 3.14, 855 true, 856 ts, 857 d, 858 tm, 859 dtm, 860 big.NewRat(57, 100), 861 []string{"a", "b"}, 862 []int64{1, 2}, 863 []float64{1, 1.41}, 864 []bool{true, false}, 865 []time.Time{ts, ts2}, 866 []civil.Date{d, d2}, 867 []civil.Time{tm, tm2}, 868 []civil.DateTime{dtm, dtm2}, 869 []*big.Rat{big.NewRat(1, 2), big.NewRat(3, 5)}, 870 SubTestStruct{ 871 "string", 872 SubSubTestStruct{24}, 873 []SubSubTestStruct{{1}, {2}}, 874 }, 875 []SubTestStruct{ 876 {String: "empty"}, 877 { 878 "full", 879 SubSubTestStruct{1}, 880 []SubSubTestStruct{{1}, {2}}, 881 }, 882 }, 883 }, 884 { 885 Name: "b", 886 Bytes: []byte("byte2"), 887 Integer: 24, 888 Float: 4.13, 889 Boolean: false, 890 Timestamp: ts, 891 Date: d, 892 Time: tm, 893 DateTime: dtm, 894 Numeric: big.NewRat(4499, 10000), 895 }, 896 } 897 var savers []*StructSaver 898 for _, s := range want { 899 savers = append(savers, &StructSaver{Schema: schema, Struct: s}) 900 } 901 if err := upl.Put(ctx, savers); err != nil { 902 t.Fatal(putError(err)) 903 } 904 905 // Wait until the data has been uploaded. This can take a few seconds, according 906 // to https://cloud.google.com/bigquery/streaming-data-into-bigquery. 907 if err := waitForRow(ctx, table); err != nil { 908 t.Fatal(err) 909 } 910 911 // Test iteration with structs. 912 it := table.Read(ctx) 913 var got []*TestStruct 914 for { 915 var g TestStruct 916 err := it.Next(&g) 917 if err == iterator.Done { 918 break 919 } 920 if err != nil { 921 t.Fatal(err) 922 } 923 got = append(got, &g) 924 } 925 sort.Sort(byName(got)) 926 927 // BigQuery does not elide nils. It reports an error for nil fields. 928 for i, g := range got { 929 if i >= len(want) { 930 t.Errorf("%d: got %v, past end of want", i, pretty.Value(g)) 931 } else if diff := testutil.Diff(g, want[i], roundToMicros); diff != "" { 932 t.Errorf("%d: got=-, want=+:\n%s", i, diff) 933 } 934 } 935} 936 937type byName []*TestStruct 938 939func (b byName) Len() int { return len(b) } 940func (b byName) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 941func (b byName) Less(i, j int) bool { return b[i].Name < b[j].Name } 942 943func TestIntegration_UploadAndReadNullable(t *testing.T) { 944 if client == nil { 945 t.Skip("Integration tests skipped") 946 } 947 ctm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000} 948 cdt := civil.DateTime{Date: testDate, Time: ctm} 949 rat := big.NewRat(33, 100) 950 testUploadAndReadNullable(t, testStructNullable{}, make([]Value, len(testStructNullableSchema))) 951 testUploadAndReadNullable(t, testStructNullable{ 952 String: NullString{"x", true}, 953 Bytes: []byte{1, 2, 3}, 954 Integer: NullInt64{1, true}, 955 Float: NullFloat64{2.3, true}, 956 Boolean: NullBool{true, true}, 957 Timestamp: NullTimestamp{testTimestamp, true}, 958 Date: NullDate{testDate, true}, 959 Time: NullTime{ctm, true}, 960 DateTime: NullDateTime{cdt, true}, 961 Numeric: rat, 962 Record: &subNullable{X: NullInt64{4, true}}, 963 }, 964 []Value{"x", []byte{1, 2, 3}, int64(1), 2.3, true, testTimestamp, testDate, ctm, cdt, rat, []Value{int64(4)}}) 965} 966 967func testUploadAndReadNullable(t *testing.T, ts testStructNullable, wantRow []Value) { 968 ctx := context.Background() 969 table := newTable(t, testStructNullableSchema) 970 defer table.Delete(ctx) 971 972 // Populate the table. 973 upl := table.Uploader() 974 if err := upl.Put(ctx, []*StructSaver{{Schema: testStructNullableSchema, Struct: ts}}); err != nil { 975 t.Fatal(putError(err)) 976 } 977 // Wait until the data has been uploaded. This can take a few seconds, according 978 // to https://cloud.google.com/bigquery/streaming-data-into-bigquery. 979 if err := waitForRow(ctx, table); err != nil { 980 t.Fatal(err) 981 } 982 983 // Read into a []Value. 984 iter := table.Read(ctx) 985 gotRows, _, _, err := readAll(iter) 986 if err != nil { 987 t.Fatal(err) 988 } 989 if len(gotRows) != 1 { 990 t.Fatalf("got %d rows, want 1", len(gotRows)) 991 } 992 if diff := testutil.Diff(gotRows[0], wantRow, roundToMicros); diff != "" { 993 t.Error(diff) 994 } 995 996 // Read into a struct. 997 want := ts 998 var sn testStructNullable 999 it := table.Read(ctx) 1000 if err := it.Next(&sn); err != nil { 1001 t.Fatal(err) 1002 } 1003 if diff := testutil.Diff(sn, want, roundToMicros); diff != "" { 1004 t.Error(diff) 1005 } 1006} 1007 1008func TestIntegration_TableUpdate(t *testing.T) { 1009 if client == nil { 1010 t.Skip("Integration tests skipped") 1011 } 1012 ctx := context.Background() 1013 table := newTable(t, schema) 1014 defer table.Delete(ctx) 1015 1016 // Test Update of non-schema fields. 1017 tm, err := table.Metadata(ctx) 1018 if err != nil { 1019 t.Fatal(err) 1020 } 1021 wantDescription := tm.Description + "more" 1022 wantName := tm.Name + "more" 1023 wantExpiration := tm.ExpirationTime.Add(time.Hour * 24) 1024 got, err := table.Update(ctx, TableMetadataToUpdate{ 1025 Description: wantDescription, 1026 Name: wantName, 1027 ExpirationTime: wantExpiration, 1028 }, tm.ETag) 1029 if err != nil { 1030 t.Fatal(err) 1031 } 1032 if got.Description != wantDescription { 1033 t.Errorf("Description: got %q, want %q", got.Description, wantDescription) 1034 } 1035 if got.Name != wantName { 1036 t.Errorf("Name: got %q, want %q", got.Name, wantName) 1037 } 1038 if got.ExpirationTime != wantExpiration { 1039 t.Errorf("ExpirationTime: got %q, want %q", got.ExpirationTime, wantExpiration) 1040 } 1041 if !testutil.Equal(got.Schema, schema) { 1042 t.Errorf("Schema: got %v, want %v", pretty.Value(got.Schema), pretty.Value(schema)) 1043 } 1044 1045 // Blind write succeeds. 1046 _, err = table.Update(ctx, TableMetadataToUpdate{Name: "x"}, "") 1047 if err != nil { 1048 t.Fatal(err) 1049 } 1050 // Write with old etag fails. 1051 _, err = table.Update(ctx, TableMetadataToUpdate{Name: "y"}, got.ETag) 1052 if err == nil { 1053 t.Fatal("Update with old ETag succeeded, wanted failure") 1054 } 1055 1056 // Test schema update. 1057 // Columns can be added. schema2 is the same as schema, except for the 1058 // added column in the middle. 1059 nested := Schema{ 1060 {Name: "nested", Type: BooleanFieldType}, 1061 {Name: "other", Type: StringFieldType}, 1062 } 1063 schema2 := Schema{ 1064 schema[0], 1065 {Name: "rec2", Type: RecordFieldType, Schema: nested}, 1066 schema[1], 1067 schema[2], 1068 } 1069 1070 got, err = table.Update(ctx, TableMetadataToUpdate{Schema: schema2}, "") 1071 if err != nil { 1072 t.Fatal(err) 1073 } 1074 1075 // Wherever you add the column, it appears at the end. 1076 schema3 := Schema{schema2[0], schema2[2], schema2[3], schema2[1]} 1077 if !testutil.Equal(got.Schema, schema3) { 1078 t.Errorf("add field:\ngot %v\nwant %v", 1079 pretty.Value(got.Schema), pretty.Value(schema3)) 1080 } 1081 1082 // Updating with the empty schema succeeds, but is a no-op. 1083 got, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema{}}, "") 1084 if err != nil { 1085 t.Fatal(err) 1086 } 1087 if !testutil.Equal(got.Schema, schema3) { 1088 t.Errorf("empty schema:\ngot %v\nwant %v", 1089 pretty.Value(got.Schema), pretty.Value(schema3)) 1090 } 1091 1092 // Error cases when updating schema. 1093 for _, test := range []struct { 1094 desc string 1095 fields Schema 1096 }{ 1097 {"change from optional to required", Schema{ 1098 {Name: "name", Type: StringFieldType, Required: true}, 1099 schema3[1], 1100 schema3[2], 1101 schema3[3], 1102 }}, 1103 {"add a required field", Schema{ 1104 schema3[0], schema3[1], schema3[2], schema3[3], 1105 {Name: "req", Type: StringFieldType, Required: true}, 1106 }}, 1107 {"remove a field", Schema{schema3[0], schema3[1], schema3[2]}}, 1108 {"remove a nested field", Schema{ 1109 schema3[0], schema3[1], schema3[2], 1110 {Name: "rec2", Type: RecordFieldType, Schema: Schema{nested[0]}}}}, 1111 {"remove all nested fields", Schema{ 1112 schema3[0], schema3[1], schema3[2], 1113 {Name: "rec2", Type: RecordFieldType, Schema: Schema{}}}}, 1114 } { 1115 _, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema(test.fields)}, "") 1116 if err == nil { 1117 t.Errorf("%s: want error, got nil", test.desc) 1118 } else if !hasStatusCode(err, 400) { 1119 t.Errorf("%s: want 400, got %v", test.desc, err) 1120 } 1121 } 1122} 1123 1124func TestIntegration_Load(t *testing.T) { 1125 if client == nil { 1126 t.Skip("Integration tests skipped") 1127 } 1128 ctx := context.Background() 1129 // CSV data can't be loaded into a repeated field, so we use a different schema. 1130 table := newTable(t, Schema{ 1131 {Name: "name", Type: StringFieldType}, 1132 {Name: "nums", Type: IntegerFieldType}, 1133 }) 1134 defer table.Delete(ctx) 1135 1136 // Load the table from a reader. 1137 r := strings.NewReader("a,0\nb,1\nc,2\n") 1138 wantRows := [][]Value{ 1139 {"a", int64(0)}, 1140 {"b", int64(1)}, 1141 {"c", int64(2)}, 1142 } 1143 rs := NewReaderSource(r) 1144 loader := table.LoaderFrom(rs) 1145 loader.WriteDisposition = WriteTruncate 1146 loader.Labels = map[string]string{"test": "go"} 1147 job, err := loader.Run(ctx) 1148 if err != nil { 1149 t.Fatal(err) 1150 } 1151 if job.LastStatus() == nil { 1152 t.Error("no LastStatus") 1153 } 1154 conf, err := job.Config() 1155 if err != nil { 1156 t.Fatal(err) 1157 } 1158 config, ok := conf.(*LoadConfig) 1159 if !ok { 1160 t.Fatalf("got %T, want LoadConfig", conf) 1161 } 1162 diff := testutil.Diff(config, &loader.LoadConfig, 1163 cmp.AllowUnexported(Table{}), 1164 cmpopts.IgnoreUnexported(Client{}, ReaderSource{}), 1165 // returned schema is at top level, not in the config 1166 cmpopts.IgnoreFields(FileConfig{}, "Schema")) 1167 if diff != "" { 1168 t.Errorf("got=-, want=+:\n%s", diff) 1169 } 1170 if err := wait(ctx, job); err != nil { 1171 t.Fatal(err) 1172 } 1173 checkReadAndTotalRows(t, "reader load", table.Read(ctx), wantRows) 1174 1175} 1176 1177func TestIntegration_DML(t *testing.T) { 1178 if client == nil { 1179 t.Skip("Integration tests skipped") 1180 } 1181 ctx := context.Background() 1182 table := newTable(t, schema) 1183 defer table.Delete(ctx) 1184 1185 sql := fmt.Sprintf(`INSERT %s.%s (name, nums, rec) 1186 VALUES ('a', [0], STRUCT<BOOL>(TRUE)), 1187 ('b', [1], STRUCT<BOOL>(FALSE)), 1188 ('c', [2], STRUCT<BOOL>(TRUE))`, 1189 table.DatasetID, table.TableID) 1190 if err := runDML(ctx, sql); err != nil { 1191 t.Fatal(err) 1192 } 1193 wantRows := [][]Value{ 1194 {"a", []Value{int64(0)}, []Value{true}}, 1195 {"b", []Value{int64(1)}, []Value{false}}, 1196 {"c", []Value{int64(2)}, []Value{true}}, 1197 } 1198 checkRead(t, "DML", table.Read(ctx), wantRows) 1199} 1200 1201func runDML(ctx context.Context, sql string) error { 1202 // Retry insert; sometimes it fails with INTERNAL. 1203 return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) { 1204 ri, err := client.Query(sql).Read(ctx) 1205 if err != nil { 1206 if e, ok := err.(*googleapi.Error); ok && e.Code < 500 { 1207 return true, err // fail on 4xx 1208 } 1209 return false, err 1210 } 1211 // It is OK to try to iterate over DML results. The first call to Next 1212 // will return iterator.Done. 1213 err = ri.Next(nil) 1214 if err == nil { 1215 return true, errors.New("want iterator.Done on the first call, got nil") 1216 } 1217 if err == iterator.Done { 1218 return true, nil 1219 } 1220 if e, ok := err.(*googleapi.Error); ok && e.Code < 500 { 1221 return true, err // fail on 4xx 1222 } 1223 return false, err 1224 }) 1225} 1226 1227func TestIntegration_TimeTypes(t *testing.T) { 1228 if client == nil { 1229 t.Skip("Integration tests skipped") 1230 } 1231 ctx := context.Background() 1232 dtSchema := Schema{ 1233 {Name: "d", Type: DateFieldType}, 1234 {Name: "t", Type: TimeFieldType}, 1235 {Name: "dt", Type: DateTimeFieldType}, 1236 {Name: "ts", Type: TimestampFieldType}, 1237 } 1238 table := newTable(t, dtSchema) 1239 defer table.Delete(ctx) 1240 1241 d := civil.Date{Year: 2016, Month: 3, Day: 20} 1242 tm := civil.Time{Hour: 12, Minute: 30, Second: 0, Nanosecond: 6000} 1243 dtm := civil.DateTime{Date: d, Time: tm} 1244 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC) 1245 wantRows := [][]Value{ 1246 {d, tm, dtm, ts}, 1247 } 1248 upl := table.Uploader() 1249 if err := upl.Put(ctx, []*ValuesSaver{ 1250 {Schema: dtSchema, Row: wantRows[0]}, 1251 }); err != nil { 1252 t.Fatal(putError(err)) 1253 } 1254 if err := waitForRow(ctx, table); err != nil { 1255 t.Fatal(err) 1256 } 1257 1258 // SQL wants DATETIMEs with a space between date and time, but the service 1259 // returns them in RFC3339 form, with a "T" between. 1260 query := fmt.Sprintf("INSERT %s.%s (d, t, dt, ts) "+ 1261 "VALUES ('%s', '%s', '%s', '%s')", 1262 table.DatasetID, table.TableID, 1263 d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05")) 1264 if err := runDML(ctx, query); err != nil { 1265 t.Fatal(err) 1266 } 1267 wantRows = append(wantRows, wantRows[0]) 1268 checkRead(t, "TimeTypes", table.Read(ctx), wantRows) 1269} 1270 1271func TestIntegration_StandardQuery(t *testing.T) { 1272 if client == nil { 1273 t.Skip("Integration tests skipped") 1274 } 1275 ctx := context.Background() 1276 1277 d := civil.Date{Year: 2016, Month: 3, Day: 20} 1278 tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 0} 1279 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC) 1280 dtm := ts.Format("2006-01-02 15:04:05") 1281 1282 // Constructs Value slices made up of int64s. 1283 ints := func(args ...int) []Value { 1284 vals := make([]Value, len(args)) 1285 for i, arg := range args { 1286 vals[i] = int64(arg) 1287 } 1288 return vals 1289 } 1290 1291 testCases := []struct { 1292 query string 1293 wantRow []Value 1294 }{ 1295 {"SELECT 1", ints(1)}, 1296 {"SELECT 1.3", []Value{1.3}}, 1297 {"SELECT CAST(1.3 AS NUMERIC)", []Value{big.NewRat(13, 10)}}, 1298 {"SELECT NUMERIC '0.25'", []Value{big.NewRat(1, 4)}}, 1299 {"SELECT TRUE", []Value{true}}, 1300 {"SELECT 'ABC'", []Value{"ABC"}}, 1301 {"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}}, 1302 {fmt.Sprintf("SELECT TIMESTAMP '%s'", dtm), []Value{ts}}, 1303 {fmt.Sprintf("SELECT [TIMESTAMP '%s', TIMESTAMP '%s']", dtm, dtm), []Value{[]Value{ts, ts}}}, 1304 {fmt.Sprintf("SELECT ('hello', TIMESTAMP '%s')", dtm), []Value{[]Value{"hello", ts}}}, 1305 {fmt.Sprintf("SELECT DATETIME(TIMESTAMP '%s')", dtm), []Value{civil.DateTime{Date: d, Time: tm}}}, 1306 {fmt.Sprintf("SELECT DATE(TIMESTAMP '%s')", dtm), []Value{d}}, 1307 {fmt.Sprintf("SELECT TIME(TIMESTAMP '%s')", dtm), []Value{tm}}, 1308 {"SELECT (1, 2)", []Value{ints(1, 2)}}, 1309 {"SELECT [1, 2, 3]", []Value{ints(1, 2, 3)}}, 1310 {"SELECT ([1, 2], 3, [4, 5])", []Value{[]Value{ints(1, 2), int64(3), ints(4, 5)}}}, 1311 {"SELECT [(1, 2, 3), (4, 5, 6)]", []Value{[]Value{ints(1, 2, 3), ints(4, 5, 6)}}}, 1312 {"SELECT [([1, 2, 3], 4), ([5, 6], 7)]", []Value{[]Value{[]Value{ints(1, 2, 3), int64(4)}, []Value{ints(5, 6), int64(7)}}}}, 1313 {"SELECT ARRAY(SELECT STRUCT([1, 2]))", []Value{[]Value{[]Value{ints(1, 2)}}}}, 1314 } 1315 for _, c := range testCases { 1316 q := client.Query(c.query) 1317 it, err := q.Read(ctx) 1318 if err != nil { 1319 t.Fatal(err) 1320 } 1321 checkRead(t, "StandardQuery", it, [][]Value{c.wantRow}) 1322 } 1323} 1324 1325func TestIntegration_LegacyQuery(t *testing.T) { 1326 if client == nil { 1327 t.Skip("Integration tests skipped") 1328 } 1329 ctx := context.Background() 1330 1331 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC) 1332 dtm := ts.Format("2006-01-02 15:04:05") 1333 1334 testCases := []struct { 1335 query string 1336 wantRow []Value 1337 }{ 1338 {"SELECT 1", []Value{int64(1)}}, 1339 {"SELECT 1.3", []Value{1.3}}, 1340 {"SELECT TRUE", []Value{true}}, 1341 {"SELECT 'ABC'", []Value{"ABC"}}, 1342 {"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}}, 1343 {fmt.Sprintf("SELECT TIMESTAMP('%s')", dtm), []Value{ts}}, 1344 {fmt.Sprintf("SELECT DATE(TIMESTAMP('%s'))", dtm), []Value{"2016-03-20"}}, 1345 {fmt.Sprintf("SELECT TIME(TIMESTAMP('%s'))", dtm), []Value{"15:04:05"}}, 1346 } 1347 for _, c := range testCases { 1348 q := client.Query(c.query) 1349 q.UseLegacySQL = true 1350 it, err := q.Read(ctx) 1351 if err != nil { 1352 t.Fatal(err) 1353 } 1354 checkRead(t, "LegacyQuery", it, [][]Value{c.wantRow}) 1355 } 1356} 1357 1358func TestIntegration_QueryParameters(t *testing.T) { 1359 if client == nil { 1360 t.Skip("Integration tests skipped") 1361 } 1362 ctx := context.Background() 1363 1364 d := civil.Date{Year: 2016, Month: 3, Day: 20} 1365 tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 3008} 1366 rtm := tm 1367 rtm.Nanosecond = 3000 // round to microseconds 1368 dtm := civil.DateTime{Date: d, Time: tm} 1369 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC) 1370 rat := big.NewRat(13, 10) 1371 1372 type ss struct { 1373 String string 1374 } 1375 1376 type s struct { 1377 Timestamp time.Time 1378 StringArray []string 1379 SubStruct ss 1380 SubStructArray []ss 1381 } 1382 1383 testCases := []struct { 1384 query string 1385 parameters []QueryParameter 1386 wantRow []Value 1387 wantConfig interface{} 1388 }{ 1389 { 1390 "SELECT @val", 1391 []QueryParameter{{"val", 1}}, 1392 []Value{int64(1)}, 1393 int64(1), 1394 }, 1395 { 1396 "SELECT @val", 1397 []QueryParameter{{"val", 1.3}}, 1398 []Value{1.3}, 1399 1.3, 1400 }, 1401 { 1402 "SELECT @val", 1403 []QueryParameter{{"val", rat}}, 1404 []Value{rat}, 1405 rat, 1406 }, 1407 { 1408 "SELECT @val", 1409 []QueryParameter{{"val", true}}, 1410 []Value{true}, 1411 true, 1412 }, 1413 { 1414 "SELECT @val", 1415 []QueryParameter{{"val", "ABC"}}, 1416 []Value{"ABC"}, 1417 "ABC", 1418 }, 1419 { 1420 "SELECT @val", 1421 []QueryParameter{{"val", []byte("foo")}}, 1422 []Value{[]byte("foo")}, 1423 []byte("foo"), 1424 }, 1425 { 1426 "SELECT @val", 1427 []QueryParameter{{"val", ts}}, 1428 []Value{ts}, 1429 ts, 1430 }, 1431 { 1432 "SELECT @val", 1433 []QueryParameter{{"val", []time.Time{ts, ts}}}, 1434 []Value{[]Value{ts, ts}}, 1435 []interface{}{ts, ts}, 1436 }, 1437 { 1438 "SELECT @val", 1439 []QueryParameter{{"val", dtm}}, 1440 []Value{civil.DateTime{Date: d, Time: rtm}}, 1441 civil.DateTime{Date: d, Time: rtm}, 1442 }, 1443 { 1444 "SELECT @val", 1445 []QueryParameter{{"val", d}}, 1446 []Value{d}, 1447 d, 1448 }, 1449 { 1450 "SELECT @val", 1451 []QueryParameter{{"val", tm}}, 1452 []Value{rtm}, 1453 rtm, 1454 }, 1455 { 1456 "SELECT @val", 1457 []QueryParameter{{"val", s{ts, []string{"a", "b"}, ss{"c"}, []ss{{"d"}, {"e"}}}}}, 1458 []Value{[]Value{ts, []Value{"a", "b"}, []Value{"c"}, []Value{[]Value{"d"}, []Value{"e"}}}}, 1459 map[string]interface{}{ 1460 "Timestamp": ts, 1461 "StringArray": []interface{}{"a", "b"}, 1462 "SubStruct": map[string]interface{}{"String": "c"}, 1463 "SubStructArray": []interface{}{ 1464 map[string]interface{}{"String": "d"}, 1465 map[string]interface{}{"String": "e"}, 1466 }, 1467 }, 1468 }, 1469 { 1470 "SELECT @val.Timestamp, @val.SubStruct.String", 1471 []QueryParameter{{"val", s{Timestamp: ts, SubStruct: ss{"a"}}}}, 1472 []Value{ts, "a"}, 1473 map[string]interface{}{ 1474 "Timestamp": ts, 1475 "SubStruct": map[string]interface{}{"String": "a"}, 1476 "StringArray": nil, 1477 "SubStructArray": nil, 1478 }, 1479 }, 1480 } 1481 for _, c := range testCases { 1482 q := client.Query(c.query) 1483 q.Parameters = c.parameters 1484 job, err := q.Run(ctx) 1485 if err != nil { 1486 t.Fatal(err) 1487 } 1488 if job.LastStatus() == nil { 1489 t.Error("no LastStatus") 1490 } 1491 it, err := job.Read(ctx) 1492 if err != nil { 1493 t.Fatal(err) 1494 } 1495 checkRead(t, "QueryParameters", it, [][]Value{c.wantRow}) 1496 config, err := job.Config() 1497 if err != nil { 1498 t.Fatal(err) 1499 } 1500 got := config.(*QueryConfig).Parameters[0].Value 1501 if !testutil.Equal(got, c.wantConfig) { 1502 t.Errorf("param %[1]v (%[1]T): config:\ngot %[2]v (%[2]T)\nwant %[3]v (%[3]T)", 1503 c.parameters[0].Value, got, c.wantConfig) 1504 } 1505 } 1506} 1507 1508func TestIntegration_QueryDryRun(t *testing.T) { 1509 if client == nil { 1510 t.Skip("Integration tests skipped") 1511 } 1512 ctx := context.Background() 1513 q := client.Query("SELECT word from " + stdName + " LIMIT 10") 1514 q.DryRun = true 1515 job, err := q.Run(ctx) 1516 if err != nil { 1517 t.Fatal(err) 1518 } 1519 1520 s := job.LastStatus() 1521 if s.State != Done { 1522 t.Errorf("state is %v, expected Done", s.State) 1523 } 1524 if s.Statistics == nil { 1525 t.Fatal("no statistics") 1526 } 1527 if s.Statistics.Details.(*QueryStatistics).Schema == nil { 1528 t.Fatal("no schema") 1529 } 1530} 1531 1532func TestIntegration_ExtractExternal(t *testing.T) { 1533 // Create a table, extract it to GCS, then query it externally. 1534 if client == nil { 1535 t.Skip("Integration tests skipped") 1536 } 1537 ctx := context.Background() 1538 schema := Schema{ 1539 {Name: "name", Type: StringFieldType}, 1540 {Name: "num", Type: IntegerFieldType}, 1541 } 1542 table := newTable(t, schema) 1543 defer table.Delete(ctx) 1544 1545 // Insert table data. 1546 sql := fmt.Sprintf(`INSERT %s.%s (name, num) 1547 VALUES ('a', 1), ('b', 2), ('c', 3)`, 1548 table.DatasetID, table.TableID) 1549 if err := runDML(ctx, sql); err != nil { 1550 t.Fatal(err) 1551 } 1552 // Extract to a GCS object as CSV. 1553 bucketName := testutil.ProjID() 1554 objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID) 1555 uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName) 1556 defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx) 1557 gr := NewGCSReference(uri) 1558 gr.DestinationFormat = CSV 1559 e := table.ExtractorTo(gr) 1560 job, err := e.Run(ctx) 1561 if err != nil { 1562 t.Fatal(err) 1563 } 1564 conf, err := job.Config() 1565 if err != nil { 1566 t.Fatal(err) 1567 } 1568 config, ok := conf.(*ExtractConfig) 1569 if !ok { 1570 t.Fatalf("got %T, want ExtractConfig", conf) 1571 } 1572 diff := testutil.Diff(config, &e.ExtractConfig, 1573 cmp.AllowUnexported(Table{}), 1574 cmpopts.IgnoreUnexported(Client{})) 1575 if diff != "" { 1576 t.Errorf("got=-, want=+:\n%s", diff) 1577 } 1578 if err := wait(ctx, job); err != nil { 1579 t.Fatal(err) 1580 } 1581 1582 edc := &ExternalDataConfig{ 1583 SourceFormat: CSV, 1584 SourceURIs: []string{uri}, 1585 Schema: schema, 1586 Options: &CSVOptions{SkipLeadingRows: 1}, 1587 } 1588 // Query that CSV file directly. 1589 q := client.Query("SELECT * FROM csv") 1590 q.TableDefinitions = map[string]ExternalData{"csv": edc} 1591 wantRows := [][]Value{ 1592 {"a", int64(1)}, 1593 {"b", int64(2)}, 1594 {"c", int64(3)}, 1595 } 1596 iter, err := q.Read(ctx) 1597 if err != nil { 1598 t.Fatal(err) 1599 } 1600 checkReadAndTotalRows(t, "external query", iter, wantRows) 1601 1602 // Make a table pointing to the file, and query it. 1603 // BigQuery does not allow a Table.Read on an external table. 1604 table = dataset.Table(tableIDs.New()) 1605 err = table.Create(context.Background(), &TableMetadata{ 1606 Schema: schema, 1607 ExpirationTime: testTableExpiration, 1608 ExternalDataConfig: edc, 1609 }) 1610 if err != nil { 1611 t.Fatal(err) 1612 } 1613 q = client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID)) 1614 iter, err = q.Read(ctx) 1615 if err != nil { 1616 t.Fatal(err) 1617 } 1618 checkReadAndTotalRows(t, "external table", iter, wantRows) 1619 1620 // While we're here, check that the table metadata is correct. 1621 md, err := table.Metadata(ctx) 1622 if err != nil { 1623 t.Fatal(err) 1624 } 1625 // One difference: since BigQuery returns the schema as part of the ordinary 1626 // table metadata, it does not populate ExternalDataConfig.Schema. 1627 md.ExternalDataConfig.Schema = md.Schema 1628 if diff := testutil.Diff(md.ExternalDataConfig, edc); diff != "" { 1629 t.Errorf("got=-, want=+\n%s", diff) 1630 } 1631} 1632 1633func TestIntegration_ReadNullIntoStruct(t *testing.T) { 1634 // Reading a null into a struct field should return an error (not panic). 1635 if client == nil { 1636 t.Skip("Integration tests skipped") 1637 } 1638 ctx := context.Background() 1639 table := newTable(t, schema) 1640 defer table.Delete(ctx) 1641 1642 upl := table.Uploader() 1643 row := &ValuesSaver{ 1644 Schema: schema, 1645 Row: []Value{nil, []Value{}, []Value{nil}}, 1646 } 1647 if err := upl.Put(ctx, []*ValuesSaver{row}); err != nil { 1648 t.Fatal(putError(err)) 1649 } 1650 if err := waitForRow(ctx, table); err != nil { 1651 t.Fatal(err) 1652 } 1653 1654 q := client.Query(fmt.Sprintf("select name from %s", table.TableID)) 1655 q.DefaultProjectID = dataset.ProjectID 1656 q.DefaultDatasetID = dataset.DatasetID 1657 it, err := q.Read(ctx) 1658 if err != nil { 1659 t.Fatal(err) 1660 } 1661 type S struct{ Name string } 1662 var s S 1663 if err := it.Next(&s); err == nil { 1664 t.Fatal("got nil, want error") 1665 } 1666} 1667 1668const ( 1669 stdName = "`bigquery-public-data.samples.shakespeare`" 1670 legacyName = "[bigquery-public-data:samples.shakespeare]" 1671) 1672 1673// These tests exploit the fact that the two SQL versions have different syntaxes for 1674// fully-qualified table names. 1675var useLegacySqlTests = []struct { 1676 t string // name of table 1677 std, legacy bool // use standard/legacy SQL 1678 err bool // do we expect an error? 1679}{ 1680 {t: legacyName, std: false, legacy: true, err: false}, 1681 {t: legacyName, std: true, legacy: false, err: true}, 1682 {t: legacyName, std: false, legacy: false, err: true}, // standard SQL is default 1683 {t: legacyName, std: true, legacy: true, err: true}, 1684 {t: stdName, std: false, legacy: true, err: true}, 1685 {t: stdName, std: true, legacy: false, err: false}, 1686 {t: stdName, std: false, legacy: false, err: false}, // standard SQL is default 1687 {t: stdName, std: true, legacy: true, err: true}, 1688} 1689 1690func TestIntegration_QueryUseLegacySQL(t *testing.T) { 1691 // Test the UseLegacySQL and UseStandardSQL options for queries. 1692 if client == nil { 1693 t.Skip("Integration tests skipped") 1694 } 1695 ctx := context.Background() 1696 for _, test := range useLegacySqlTests { 1697 q := client.Query(fmt.Sprintf("select word from %s limit 1", test.t)) 1698 q.UseStandardSQL = test.std 1699 q.UseLegacySQL = test.legacy 1700 _, err := q.Read(ctx) 1701 gotErr := err != nil 1702 if gotErr && !test.err { 1703 t.Errorf("%+v:\nunexpected error: %v", test, err) 1704 } else if !gotErr && test.err { 1705 t.Errorf("%+v:\nsucceeded, but want error", test) 1706 } 1707 } 1708} 1709 1710func TestIntegration_TableUseLegacySQL(t *testing.T) { 1711 // Test UseLegacySQL and UseStandardSQL for Table.Create. 1712 if client == nil { 1713 t.Skip("Integration tests skipped") 1714 } 1715 ctx := context.Background() 1716 table := newTable(t, schema) 1717 defer table.Delete(ctx) 1718 for i, test := range useLegacySqlTests { 1719 view := dataset.Table(fmt.Sprintf("t_view_%d", i)) 1720 tm := &TableMetadata{ 1721 ViewQuery: fmt.Sprintf("SELECT word from %s", test.t), 1722 UseStandardSQL: test.std, 1723 UseLegacySQL: test.legacy, 1724 } 1725 err := view.Create(ctx, tm) 1726 gotErr := err != nil 1727 if gotErr && !test.err { 1728 t.Errorf("%+v:\nunexpected error: %v", test, err) 1729 } else if !gotErr && test.err { 1730 t.Errorf("%+v:\nsucceeded, but want error", test) 1731 } 1732 _ = view.Delete(ctx) 1733 } 1734} 1735 1736func TestIntegration_ListJobs(t *testing.T) { 1737 // It's difficult to test the list of jobs, because we can't easily 1738 // control what's in it. Also, there are many jobs in the test project, 1739 // and it takes considerable time to list them all. 1740 if client == nil { 1741 t.Skip("Integration tests skipped") 1742 } 1743 ctx := context.Background() 1744 1745 // About all we can do is list a few jobs. 1746 const max = 20 1747 var jobs []*Job 1748 it := client.Jobs(ctx) 1749 for { 1750 job, err := it.Next() 1751 if err == iterator.Done { 1752 break 1753 } 1754 if err != nil { 1755 t.Fatal(err) 1756 } 1757 jobs = append(jobs, job) 1758 if len(jobs) >= max { 1759 break 1760 } 1761 } 1762 // We expect that there is at least one job in the last few months. 1763 if len(jobs) == 0 { 1764 t.Fatal("did not get any jobs") 1765 } 1766} 1767 1768const tokyo = "asia-northeast1" 1769 1770func TestIntegration_Location(t *testing.T) { 1771 if client == nil { 1772 t.Skip("Integration tests skipped") 1773 } 1774 client.Location = "" 1775 testLocation(t, tokyo) 1776 client.Location = tokyo 1777 defer func() { 1778 client.Location = "" 1779 }() 1780 testLocation(t, "") 1781} 1782 1783func testLocation(t *testing.T, loc string) { 1784 ctx := context.Background() 1785 tokyoDataset := client.Dataset("tokyo") 1786 err := tokyoDataset.Create(ctx, &DatasetMetadata{Location: loc}) 1787 if err != nil && !hasStatusCode(err, 409) { // 409 = already exists 1788 t.Fatal(err) 1789 } 1790 md, err := tokyoDataset.Metadata(ctx) 1791 if err != nil { 1792 t.Fatal(err) 1793 } 1794 if md.Location != tokyo { 1795 t.Fatalf("dataset location: got %s, want %s", md.Location, tokyo) 1796 } 1797 table := tokyoDataset.Table(tableIDs.New()) 1798 err = table.Create(context.Background(), &TableMetadata{ 1799 Schema: Schema{ 1800 {Name: "name", Type: StringFieldType}, 1801 {Name: "nums", Type: IntegerFieldType}, 1802 }, 1803 ExpirationTime: testTableExpiration, 1804 }) 1805 if err != nil { 1806 t.Fatal(err) 1807 } 1808 defer table.Delete(ctx) 1809 loader := table.LoaderFrom(NewReaderSource(strings.NewReader("a,0\nb,1\nc,2\n"))) 1810 loader.Location = loc 1811 job, err := loader.Run(ctx) 1812 if err != nil { 1813 t.Fatal("loader.Run", err) 1814 } 1815 if job.Location() != tokyo { 1816 t.Fatalf("job location: got %s, want %s", job.Location(), tokyo) 1817 } 1818 _, err = client.JobFromID(ctx, job.ID()) 1819 if client.Location == "" && err == nil { 1820 t.Error("JobFromID with Tokyo job, no client location: want error, got nil") 1821 } 1822 if client.Location != "" && err != nil { 1823 t.Errorf("JobFromID with Tokyo job, with client location: want nil, got %v", err) 1824 } 1825 _, err = client.JobFromIDLocation(ctx, job.ID(), "US") 1826 if err == nil { 1827 t.Error("JobFromIDLocation with US: want error, got nil") 1828 } 1829 job2, err := client.JobFromIDLocation(ctx, job.ID(), loc) 1830 if loc == tokyo && err != nil { 1831 t.Errorf("loc=tokyo: %v", err) 1832 } 1833 if loc == "" && err == nil { 1834 t.Error("loc empty: got nil, want error") 1835 } 1836 if job2 != nil && (job2.ID() != job.ID() || job2.Location() != tokyo) { 1837 t.Errorf("got id %s loc %s, want id%s loc %s", job2.ID(), job2.Location(), job.ID(), tokyo) 1838 } 1839 if err := wait(ctx, job); err != nil { 1840 t.Fatal(err) 1841 } 1842 // Cancel should succeed even if the job is done. 1843 if err := job.Cancel(ctx); err != nil { 1844 t.Fatal(err) 1845 } 1846 1847 q := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID)) 1848 q.Location = loc 1849 iter, err := q.Read(ctx) 1850 if err != nil { 1851 t.Fatal(err) 1852 } 1853 wantRows := [][]Value{ 1854 {"a", int64(0)}, 1855 {"b", int64(1)}, 1856 {"c", int64(2)}, 1857 } 1858 checkRead(t, "location", iter, wantRows) 1859 1860 table2 := tokyoDataset.Table(tableIDs.New()) 1861 copier := table2.CopierFrom(table) 1862 copier.Location = loc 1863 if _, err := copier.Run(ctx); err != nil { 1864 t.Fatal(err) 1865 } 1866 bucketName := testutil.ProjID() 1867 objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID) 1868 uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName) 1869 defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx) 1870 gr := NewGCSReference(uri) 1871 gr.DestinationFormat = CSV 1872 e := table.ExtractorTo(gr) 1873 e.Location = loc 1874 if _, err := e.Run(ctx); err != nil { 1875 t.Fatal(err) 1876 } 1877} 1878 1879func TestIntegration_NumericErrors(t *testing.T) { 1880 // Verify that the service returns an error for a big.Rat that's too large. 1881 if client == nil { 1882 t.Skip("Integration tests skipped") 1883 } 1884 ctx := context.Background() 1885 schema := Schema{{Name: "n", Type: NumericFieldType}} 1886 table := newTable(t, schema) 1887 defer table.Delete(ctx) 1888 tooBigRat := &big.Rat{} 1889 if _, ok := tooBigRat.SetString("1e40"); !ok { 1890 t.Fatal("big.Rat.SetString failed") 1891 } 1892 upl := table.Uploader() 1893 err := upl.Put(ctx, []*ValuesSaver{{Schema: schema, Row: []Value{tooBigRat}}}) 1894 if err == nil { 1895 t.Fatal("got nil, want error") 1896 } 1897} 1898 1899func TestIntegration_QueryErrors(t *testing.T) { 1900 // Verify that a bad query returns an appropriate error. 1901 if client == nil { 1902 t.Skip("Integration tests skipped") 1903 } 1904 ctx := context.Background() 1905 q := client.Query("blah blah broken") 1906 _, err := q.Read(ctx) 1907 const want = "invalidQuery" 1908 if !strings.Contains(err.Error(), want) { 1909 t.Fatalf("got %q, want substring %q", err, want) 1910 } 1911} 1912 1913func TestIntegration_Model(t *testing.T) { 1914 // Create an ML model. 1915 if client == nil { 1916 t.Skip("Integration tests skipped") 1917 } 1918 ctx := context.Background() 1919 schema := Schema{ 1920 {Name: "input", Type: IntegerFieldType}, 1921 {Name: "label", Type: IntegerFieldType}, 1922 } 1923 table := newTable(t, schema) 1924 defer table.Delete(ctx) 1925 1926 // Insert table data. 1927 tableName := fmt.Sprintf("%s.%s", table.DatasetID, table.TableID) 1928 sql := fmt.Sprintf(`INSERT %s (input, label) 1929 VALUES (1, 0), (2, 1), (3, 0), (4, 1)`, 1930 tableName) 1931 wantNumRows := 4 1932 1933 if err := runDML(ctx, sql); err != nil { 1934 t.Fatal(err) 1935 } 1936 1937 model := dataset.Table("my_model") 1938 modelName := fmt.Sprintf("%s.%s", model.DatasetID, model.TableID) 1939 sql = fmt.Sprintf(`CREATE MODEL %s OPTIONS (model_type='logistic_reg') AS SELECT input, label FROM %s`, 1940 modelName, tableName) 1941 if err := runDML(ctx, sql); err != nil { 1942 t.Fatal(err) 1943 } 1944 defer model.Delete(ctx) 1945 1946 sql = fmt.Sprintf(`SELECT * FROM ml.PREDICT(MODEL %s, TABLE %s)`, modelName, tableName) 1947 q := client.Query(sql) 1948 ri, err := q.Read(ctx) 1949 if err != nil { 1950 t.Fatal(err) 1951 } 1952 rows, _, _, err := readAll(ri) 1953 if err != nil { 1954 t.Fatal(err) 1955 } 1956 if got := len(rows); got != wantNumRows { 1957 t.Fatalf("got %d rows in prediction table, want %d", got, wantNumRows) 1958 } 1959 iter := dataset.Tables(ctx) 1960 seen := false 1961 for { 1962 tbl, err := iter.Next() 1963 if err == iterator.Done { 1964 break 1965 } 1966 if err != nil { 1967 t.Fatal(err) 1968 } 1969 if tbl.TableID == "my_model" { 1970 seen = true 1971 } 1972 } 1973 if !seen { 1974 t.Fatal("model not listed in dataset") 1975 } 1976 if err := model.Delete(ctx); err != nil { 1977 t.Fatal(err) 1978 } 1979} 1980 1981// Creates a new, temporary table with a unique name and the given schema. 1982func newTable(t *testing.T, s Schema) *Table { 1983 table := dataset.Table(tableIDs.New()) 1984 err := table.Create(context.Background(), &TableMetadata{ 1985 Schema: s, 1986 ExpirationTime: testTableExpiration, 1987 }) 1988 if err != nil { 1989 t.Fatal(err) 1990 } 1991 return table 1992} 1993 1994func checkRead(t *testing.T, msg string, it *RowIterator, want [][]Value) { 1995 if msg2, ok := compareRead(it, want, false); !ok { 1996 t.Errorf("%s: %s", msg, msg2) 1997 } 1998} 1999 2000func checkReadAndTotalRows(t *testing.T, msg string, it *RowIterator, want [][]Value) { 2001 if msg2, ok := compareRead(it, want, true); !ok { 2002 t.Errorf("%s: %s", msg, msg2) 2003 } 2004} 2005 2006func compareRead(it *RowIterator, want [][]Value, compareTotalRows bool) (msg string, ok bool) { 2007 got, _, totalRows, err := readAll(it) 2008 if err != nil { 2009 return err.Error(), false 2010 } 2011 if len(got) != len(want) { 2012 return fmt.Sprintf("got %d rows, want %d", len(got), len(want)), false 2013 } 2014 if compareTotalRows && len(got) != int(totalRows) { 2015 return fmt.Sprintf("got %d rows, but totalRows = %d", len(got), totalRows), false 2016 } 2017 sort.Sort(byCol0(got)) 2018 for i, r := range got { 2019 gotRow := []Value(r) 2020 wantRow := want[i] 2021 if !testutil.Equal(gotRow, wantRow) { 2022 return fmt.Sprintf("#%d: got %#v, want %#v", i, gotRow, wantRow), false 2023 } 2024 } 2025 return "", true 2026} 2027 2028func readAll(it *RowIterator) ([][]Value, Schema, uint64, error) { 2029 var ( 2030 rows [][]Value 2031 schema Schema 2032 totalRows uint64 2033 ) 2034 for { 2035 var vals []Value 2036 err := it.Next(&vals) 2037 if err == iterator.Done { 2038 return rows, schema, totalRows, nil 2039 } 2040 if err != nil { 2041 return nil, nil, 0, err 2042 } 2043 rows = append(rows, vals) 2044 schema = it.Schema 2045 totalRows = it.TotalRows 2046 } 2047} 2048 2049type byCol0 [][]Value 2050 2051func (b byCol0) Len() int { return len(b) } 2052func (b byCol0) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 2053func (b byCol0) Less(i, j int) bool { 2054 switch a := b[i][0].(type) { 2055 case string: 2056 return a < b[j][0].(string) 2057 case civil.Date: 2058 return a.Before(b[j][0].(civil.Date)) 2059 default: 2060 panic("unknown type") 2061 } 2062} 2063 2064func hasStatusCode(err error, code int) bool { 2065 if e, ok := err.(*googleapi.Error); ok && e.Code == code { 2066 return true 2067 } 2068 return false 2069} 2070 2071// wait polls the job until it is complete or an error is returned. 2072func wait(ctx context.Context, job *Job) error { 2073 status, err := job.Wait(ctx) 2074 if err != nil { 2075 return err 2076 } 2077 if status.Err() != nil { 2078 return fmt.Errorf("job status error: %#v", status.Err()) 2079 } 2080 if status.Statistics == nil { 2081 return errors.New("nil Statistics") 2082 } 2083 if status.Statistics.EndTime.IsZero() { 2084 return errors.New("EndTime is zero") 2085 } 2086 if status.Statistics.Details == nil { 2087 return errors.New("nil Statistics.Details") 2088 } 2089 return nil 2090} 2091 2092// waitForRow polls the table until it contains a row. 2093// TODO(jba): use internal.Retry. 2094func waitForRow(ctx context.Context, table *Table) error { 2095 for { 2096 it := table.Read(ctx) 2097 var v []Value 2098 err := it.Next(&v) 2099 if err == nil { 2100 return nil 2101 } 2102 if err != iterator.Done { 2103 return err 2104 } 2105 time.Sleep(1 * time.Second) 2106 } 2107} 2108 2109func putError(err error) string { 2110 pme, ok := err.(PutMultiError) 2111 if !ok { 2112 return err.Error() 2113 } 2114 var msgs []string 2115 for _, err := range pme { 2116 msgs = append(msgs, err.Error()) 2117 } 2118 return strings.Join(msgs, "\n") 2119} 2120