1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "encoding/json" 20 "errors" 21 "flag" 22 "fmt" 23 "log" 24 "math/big" 25 "net/http" 26 "os" 27 "sort" 28 "strings" 29 "testing" 30 "time" 31 32 "cloud.google.com/go/civil" 33 "cloud.google.com/go/httpreplay" 34 "cloud.google.com/go/internal" 35 "cloud.google.com/go/internal/pretty" 36 "cloud.google.com/go/internal/testutil" 37 "cloud.google.com/go/internal/uid" 38 "cloud.google.com/go/storage" 39 "github.com/google/go-cmp/cmp" 40 "github.com/google/go-cmp/cmp/cmpopts" 41 gax "github.com/googleapis/gax-go/v2" 42 "google.golang.org/api/googleapi" 43 "google.golang.org/api/iterator" 44 "google.golang.org/api/option" 45) 46 47const replayFilename = "bigquery.replay" 48 49var record = flag.Bool("record", false, "record RPCs") 50 51var ( 52 client *Client 53 storageClient *storage.Client 54 dataset *Dataset 55 schema = Schema{ 56 {Name: "name", Type: StringFieldType}, 57 {Name: "nums", Type: IntegerFieldType, Repeated: true}, 58 {Name: "rec", Type: RecordFieldType, Schema: Schema{ 59 {Name: "bool", Type: BooleanFieldType}, 60 }}, 61 } 62 testTableExpiration time.Time 63 datasetIDs, tableIDs, modelIDs, routineIDs *uid.Space 64) 65 66// Note: integration tests cannot be run in parallel, because TestIntegration_Location 67// modifies the client. 68 69func TestMain(m *testing.M) { 70 cleanup := initIntegrationTest() 71 r := m.Run() 72 cleanup() 73 os.Exit(r) 74} 75 76func getClient(t *testing.T) *Client { 77 if client == nil { 78 t.Skip("Integration tests skipped") 79 } 80 return client 81} 82 83var grpcHeadersChecker = testutil.DefaultHeadersEnforcer() 84 85// If integration tests will be run, create a unique dataset for them. 86// Return a cleanup function. 87func initIntegrationTest() func() { 88 ctx := context.Background() 89 flag.Parse() // needed for testing.Short() 90 projID := testutil.ProjID() 91 switch { 92 case testing.Short() && *record: 93 log.Fatal("cannot combine -short and -record") 94 return func() {} 95 96 case testing.Short() && httpreplay.Supported() && testutil.CanReplay(replayFilename) && projID != "": 97 // go test -short with a replay file will replay the integration tests if the 98 // environment variables are set. 99 log.Printf("replaying from %s", replayFilename) 100 httpreplay.DebugHeaders() 101 replayer, err := httpreplay.NewReplayer(replayFilename) 102 if err != nil { 103 log.Fatal(err) 104 } 105 var t time.Time 106 if err := json.Unmarshal(replayer.Initial(), &t); err != nil { 107 log.Fatal(err) 108 } 109 hc, err := replayer.Client(ctx) // no creds needed 110 if err != nil { 111 log.Fatal(err) 112 } 113 copts := append(grpcHeadersChecker.CallOptions(), option.WithHTTPClient(hc)) 114 client, err = NewClient(ctx, projID, copts...) 115 if err != nil { 116 log.Fatal(err) 117 } 118 storageClient, err = storage.NewClient(ctx, copts...) 119 if err != nil { 120 log.Fatal(err) 121 } 122 cleanup := initTestState(client, t) 123 return func() { 124 cleanup() 125 _ = replayer.Close() // No actionable error returned. 126 } 127 128 case testing.Short(): 129 // go test -short without a replay file skips the integration tests. 130 if testutil.CanReplay(replayFilename) && projID != "" { 131 log.Print("replay not supported for Go versions before 1.8") 132 } 133 client = nil 134 storageClient = nil 135 return func() {} 136 137 default: // Run integration tests against a real backend. 138 ts := testutil.TokenSource(ctx, Scope) 139 if ts == nil { 140 log.Println("Integration tests skipped. See CONTRIBUTING.md for details") 141 return func() {} 142 } 143 bqOpts := append(grpcHeadersChecker.CallOptions(), option.WithTokenSource(ts)) 144 sOpts := append(grpcHeadersChecker.CallOptions(), option.WithTokenSource(testutil.TokenSource(ctx, storage.ScopeFullControl))) 145 cleanup := func() {} 146 now := time.Now().UTC() 147 if *record { 148 if !httpreplay.Supported() { 149 log.Print("record not supported for Go versions before 1.8") 150 } else { 151 nowBytes, err := json.Marshal(now) 152 if err != nil { 153 log.Fatal(err) 154 } 155 recorder, err := httpreplay.NewRecorder(replayFilename, nowBytes) 156 if err != nil { 157 log.Fatalf("could not record: %v", err) 158 } 159 log.Printf("recording to %s", replayFilename) 160 hc, err := recorder.Client(ctx, bqOpts...) 161 if err != nil { 162 log.Fatal(err) 163 } 164 bqOpts = append(grpcHeadersChecker.CallOptions(), option.WithHTTPClient(hc)) 165 hc, err = recorder.Client(ctx, sOpts...) 166 if err != nil { 167 log.Fatal(err) 168 } 169 sOpts = append(grpcHeadersChecker.CallOptions(), option.WithHTTPClient(hc)) 170 cleanup = func() { 171 if err := recorder.Close(); err != nil { 172 log.Printf("saving recording: %v", err) 173 } 174 } 175 } 176 } 177 var err error 178 client, err = NewClient(ctx, projID, bqOpts...) 179 if err != nil { 180 log.Fatalf("NewClient: %v", err) 181 } 182 storageClient, err = storage.NewClient(ctx, sOpts...) 183 if err != nil { 184 log.Fatalf("storage.NewClient: %v", err) 185 } 186 c := initTestState(client, now) 187 return func() { c(); cleanup() } 188 } 189} 190 191func initTestState(client *Client, t time.Time) func() { 192 // BigQuery does not accept hyphens in dataset or table IDs, so we create IDs 193 // with underscores. 194 ctx := context.Background() 195 opts := &uid.Options{Sep: '_', Time: t} 196 datasetIDs = uid.NewSpace("dataset", opts) 197 tableIDs = uid.NewSpace("table", opts) 198 modelIDs = uid.NewSpace("model", opts) 199 routineIDs = uid.NewSpace("routine", opts) 200 testTableExpiration = t.Add(10 * time.Minute).Round(time.Second) 201 // For replayability, seed the random source with t. 202 Seed(t.UnixNano()) 203 204 dataset = client.Dataset(datasetIDs.New()) 205 if err := dataset.Create(ctx, nil); err != nil { 206 log.Fatalf("creating dataset %s: %v", dataset.DatasetID, err) 207 } 208 return func() { 209 if err := dataset.DeleteWithContents(ctx); err != nil { 210 log.Printf("could not delete %s", dataset.DatasetID) 211 } 212 } 213} 214 215func TestIntegration_TableCreate(t *testing.T) { 216 // Check that creating a record field with an empty schema is an error. 217 if client == nil { 218 t.Skip("Integration tests skipped") 219 } 220 table := dataset.Table("t_bad") 221 schema := Schema{ 222 {Name: "rec", Type: RecordFieldType, Schema: Schema{}}, 223 } 224 err := table.Create(context.Background(), &TableMetadata{ 225 Schema: schema, 226 ExpirationTime: testTableExpiration.Add(5 * time.Minute), 227 }) 228 if err == nil { 229 t.Fatal("want error, got nil") 230 } 231 if !hasStatusCode(err, http.StatusBadRequest) { 232 t.Fatalf("want a 400 error, got %v", err) 233 } 234} 235 236func TestIntegration_TableCreateView(t *testing.T) { 237 if client == nil { 238 t.Skip("Integration tests skipped") 239 } 240 ctx := context.Background() 241 table := newTable(t, schema) 242 defer table.Delete(ctx) 243 244 // Test that standard SQL views work. 245 view := dataset.Table("t_view_standardsql") 246 query := fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM `%s.%s.%s`", 247 dataset.ProjectID, dataset.DatasetID, table.TableID) 248 err := view.Create(context.Background(), &TableMetadata{ 249 ViewQuery: query, 250 UseStandardSQL: true, 251 }) 252 if err != nil { 253 t.Fatalf("table.create: Did not expect an error, got: %v", err) 254 } 255 if err := view.Delete(ctx); err != nil { 256 t.Fatal(err) 257 } 258} 259 260func TestIntegration_TableMetadata(t *testing.T) { 261 262 if client == nil { 263 t.Skip("Integration tests skipped") 264 } 265 ctx := context.Background() 266 table := newTable(t, schema) 267 defer table.Delete(ctx) 268 // Check table metadata. 269 md, err := table.Metadata(ctx) 270 if err != nil { 271 t.Fatal(err) 272 } 273 // TODO(jba): check md more thorougly. 274 if got, want := md.FullID, fmt.Sprintf("%s:%s.%s", dataset.ProjectID, dataset.DatasetID, table.TableID); got != want { 275 t.Errorf("metadata.FullID: got %q, want %q", got, want) 276 } 277 if got, want := md.Type, RegularTable; got != want { 278 t.Errorf("metadata.Type: got %v, want %v", got, want) 279 } 280 if got, want := md.ExpirationTime, testTableExpiration; !got.Equal(want) { 281 t.Errorf("metadata.Type: got %v, want %v", got, want) 282 } 283 284 // Check that timePartitioning is nil by default 285 if md.TimePartitioning != nil { 286 t.Errorf("metadata.TimePartitioning: got %v, want %v", md.TimePartitioning, nil) 287 } 288 289 // Create tables that have time partitioning 290 partitionCases := []struct { 291 timePartitioning TimePartitioning 292 wantExpiration time.Duration 293 wantField string 294 wantPruneFilter bool 295 }{ 296 {TimePartitioning{}, time.Duration(0), "", false}, 297 {TimePartitioning{Expiration: time.Second}, time.Second, "", false}, 298 {TimePartitioning{RequirePartitionFilter: true}, time.Duration(0), "", true}, 299 { 300 TimePartitioning{ 301 Expiration: time.Second, 302 Field: "date", 303 RequirePartitionFilter: true, 304 }, time.Second, "date", true}, 305 } 306 307 schema2 := Schema{ 308 {Name: "name", Type: StringFieldType}, 309 {Name: "date", Type: DateFieldType}, 310 } 311 312 clustering := &Clustering{ 313 Fields: []string{"name"}, 314 } 315 316 // Currently, clustering depends on partitioning. Interleave testing of the two features. 317 for i, c := range partitionCases { 318 table := dataset.Table(fmt.Sprintf("t_metadata_partition_nocluster_%v", i)) 319 clusterTable := dataset.Table(fmt.Sprintf("t_metadata_partition_cluster_%v", i)) 320 321 // Create unclustered, partitioned variant and get metadata. 322 err = table.Create(context.Background(), &TableMetadata{ 323 Schema: schema2, 324 TimePartitioning: &c.timePartitioning, 325 ExpirationTime: testTableExpiration, 326 }) 327 if err != nil { 328 t.Fatal(err) 329 } 330 defer table.Delete(ctx) 331 md, err := table.Metadata(ctx) 332 if err != nil { 333 t.Fatal(err) 334 } 335 336 // Created clustered table and get metadata. 337 err = clusterTable.Create(context.Background(), &TableMetadata{ 338 Schema: schema2, 339 TimePartitioning: &c.timePartitioning, 340 ExpirationTime: testTableExpiration, 341 Clustering: clustering, 342 }) 343 if err != nil { 344 t.Fatal(err) 345 } 346 clusterMD, err := clusterTable.Metadata(ctx) 347 if err != nil { 348 t.Fatal(err) 349 } 350 351 for _, v := range []*TableMetadata{md, clusterMD} { 352 got := v.TimePartitioning 353 want := &TimePartitioning{ 354 Expiration: c.wantExpiration, 355 Field: c.wantField, 356 RequirePartitionFilter: c.wantPruneFilter, 357 } 358 if !testutil.Equal(got, want) { 359 t.Errorf("metadata.TimePartitioning: got %v, want %v", got, want) 360 } 361 // Manipulate RequirePartitionFilter at the table level. 362 mdUpdate := TableMetadataToUpdate{ 363 RequirePartitionFilter: !want.RequirePartitionFilter, 364 } 365 366 newmd, err := table.Update(ctx, mdUpdate, "") 367 if err != nil { 368 t.Errorf("failed to invert RequirePartitionFilter on %s: %v", table.FullyQualifiedName(), err) 369 } 370 if newmd.RequirePartitionFilter == want.RequirePartitionFilter { 371 t.Errorf("inverting table-level RequirePartitionFilter on %s failed, want %t got %t", table.FullyQualifiedName(), !want.RequirePartitionFilter, newmd.RequirePartitionFilter) 372 } 373 // Also verify that the clone of RequirePartitionFilter in the TimePartitioning message is consistent. 374 if newmd.RequirePartitionFilter != newmd.TimePartitioning.RequirePartitionFilter { 375 t.Errorf("inconsistent RequirePartitionFilter. Table: %t, TimePartitioning: %t", newmd.RequirePartitionFilter, newmd.TimePartitioning.RequirePartitionFilter) 376 } 377 378 } 379 380 if md.Clustering != nil { 381 t.Errorf("metadata.Clustering was not nil on unclustered table %s", table.TableID) 382 } 383 got := clusterMD.Clustering 384 want := clustering 385 if clusterMD.Clustering != clustering { 386 if !testutil.Equal(got, want) { 387 t.Errorf("metadata.Clustering: got %v, want %v", got, want) 388 } 389 } 390 } 391 392} 393 394func TestIntegration_RemoveTimePartitioning(t *testing.T) { 395 if client == nil { 396 t.Skip("Integration tests skipped") 397 } 398 ctx := context.Background() 399 table := dataset.Table(tableIDs.New()) 400 want := 24 * time.Hour 401 err := table.Create(ctx, &TableMetadata{ 402 ExpirationTime: testTableExpiration, 403 TimePartitioning: &TimePartitioning{ 404 Expiration: want, 405 }, 406 }) 407 if err != nil { 408 t.Fatal(err) 409 } 410 defer table.Delete(ctx) 411 412 md, err := table.Metadata(ctx) 413 if err != nil { 414 t.Fatal(err) 415 } 416 if got := md.TimePartitioning.Expiration; got != want { 417 t.Fatalf("TimeParitioning expiration want = %v, got = %v", want, got) 418 } 419 420 // Remove time partitioning expiration 421 md, err = table.Update(context.Background(), TableMetadataToUpdate{ 422 TimePartitioning: &TimePartitioning{Expiration: 0}, 423 }, md.ETag) 424 if err != nil { 425 t.Fatal(err) 426 } 427 428 want = time.Duration(0) 429 if got := md.TimePartitioning.Expiration; got != want { 430 t.Fatalf("TimeParitioning expiration want = %v, got = %v", want, got) 431 } 432} 433 434func TestIntegration_DatasetCreate(t *testing.T) { 435 if client == nil { 436 t.Skip("Integration tests skipped") 437 } 438 ctx := context.Background() 439 ds := client.Dataset(datasetIDs.New()) 440 wmd := &DatasetMetadata{Name: "name", Location: "EU"} 441 err := ds.Create(ctx, wmd) 442 if err != nil { 443 t.Fatal(err) 444 } 445 gmd, err := ds.Metadata(ctx) 446 if err != nil { 447 t.Fatal(err) 448 } 449 if got, want := gmd.Name, wmd.Name; got != want { 450 t.Errorf("name: got %q, want %q", got, want) 451 } 452 if got, want := gmd.Location, wmd.Location; got != want { 453 t.Errorf("location: got %q, want %q", got, want) 454 } 455 if err := ds.Delete(ctx); err != nil { 456 t.Fatalf("deleting dataset %v: %v", ds, err) 457 } 458} 459 460func TestIntegration_DatasetMetadata(t *testing.T) { 461 if client == nil { 462 t.Skip("Integration tests skipped") 463 } 464 ctx := context.Background() 465 md, err := dataset.Metadata(ctx) 466 if err != nil { 467 t.Fatal(err) 468 } 469 if got, want := md.FullID, fmt.Sprintf("%s:%s", dataset.ProjectID, dataset.DatasetID); got != want { 470 t.Errorf("FullID: got %q, want %q", got, want) 471 } 472 jan2016 := time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC) 473 if md.CreationTime.Before(jan2016) { 474 t.Errorf("CreationTime: got %s, want > 2016-1-1", md.CreationTime) 475 } 476 if md.LastModifiedTime.Before(jan2016) { 477 t.Errorf("LastModifiedTime: got %s, want > 2016-1-1", md.LastModifiedTime) 478 } 479 480 // Verify that we get a NotFound for a nonexistent dataset. 481 _, err = client.Dataset("does_not_exist").Metadata(ctx) 482 if err == nil || !hasStatusCode(err, http.StatusNotFound) { 483 t.Errorf("got %v, want NotFound error", err) 484 } 485} 486 487func TestIntegration_DatasetDelete(t *testing.T) { 488 if client == nil { 489 t.Skip("Integration tests skipped") 490 } 491 ctx := context.Background() 492 ds := client.Dataset(datasetIDs.New()) 493 if err := ds.Create(ctx, nil); err != nil { 494 t.Fatalf("creating dataset %s: %v", ds.DatasetID, err) 495 } 496 if err := ds.Delete(ctx); err != nil { 497 t.Fatalf("deleting dataset %s: %v", ds.DatasetID, err) 498 } 499} 500 501func TestIntegration_DatasetDeleteWithContents(t *testing.T) { 502 if client == nil { 503 t.Skip("Integration tests skipped") 504 } 505 ctx := context.Background() 506 ds := client.Dataset(datasetIDs.New()) 507 if err := ds.Create(ctx, nil); err != nil { 508 t.Fatalf("creating dataset %s: %v", ds.DatasetID, err) 509 } 510 table := ds.Table(tableIDs.New()) 511 if err := table.Create(ctx, nil); err != nil { 512 t.Fatalf("creating table %s in dataset %s: %v", table.TableID, table.DatasetID, err) 513 } 514 // We expect failure here 515 if err := ds.Delete(ctx); err == nil { 516 t.Fatalf("non-recursive delete of dataset %s succeeded unexpectedly.", ds.DatasetID) 517 } 518 if err := ds.DeleteWithContents(ctx); err != nil { 519 t.Fatalf("deleting recursively dataset %s: %v", ds.DatasetID, err) 520 } 521} 522 523func TestIntegration_DatasetUpdateETags(t *testing.T) { 524 if client == nil { 525 t.Skip("Integration tests skipped") 526 } 527 528 check := func(md *DatasetMetadata, wantDesc, wantName string) { 529 if md.Description != wantDesc { 530 t.Errorf("description: got %q, want %q", md.Description, wantDesc) 531 } 532 if md.Name != wantName { 533 t.Errorf("name: got %q, want %q", md.Name, wantName) 534 } 535 } 536 537 ctx := context.Background() 538 md, err := dataset.Metadata(ctx) 539 if err != nil { 540 t.Fatal(err) 541 } 542 if md.ETag == "" { 543 t.Fatal("empty ETag") 544 } 545 // Write without ETag succeeds. 546 desc := md.Description + "d2" 547 name := md.Name + "n2" 548 md2, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: desc, Name: name}, "") 549 if err != nil { 550 t.Fatal(err) 551 } 552 check(md2, desc, name) 553 554 // Write with original ETag fails because of intervening write. 555 _, err = dataset.Update(ctx, DatasetMetadataToUpdate{Description: "d", Name: "n"}, md.ETag) 556 if err == nil { 557 t.Fatal("got nil, want error") 558 } 559 560 // Write with most recent ETag succeeds. 561 md3, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: "", Name: ""}, md2.ETag) 562 if err != nil { 563 t.Fatal(err) 564 } 565 check(md3, "", "") 566} 567 568func TestIntegration_DatasetUpdateDefaultExpiration(t *testing.T) { 569 if client == nil { 570 t.Skip("Integration tests skipped") 571 } 572 ctx := context.Background() 573 _, err := dataset.Metadata(ctx) 574 if err != nil { 575 t.Fatal(err) 576 } 577 // Set the default expiration time. 578 md, err := dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Hour}, "") 579 if err != nil { 580 t.Fatal(err) 581 } 582 if md.DefaultTableExpiration != time.Hour { 583 t.Fatalf("got %s, want 1h", md.DefaultTableExpiration) 584 } 585 // Omitting DefaultTableExpiration doesn't change it. 586 md, err = dataset.Update(ctx, DatasetMetadataToUpdate{Name: "xyz"}, "") 587 if err != nil { 588 t.Fatal(err) 589 } 590 if md.DefaultTableExpiration != time.Hour { 591 t.Fatalf("got %s, want 1h", md.DefaultTableExpiration) 592 } 593 // Setting it to 0 deletes it (which looks like a 0 duration). 594 md, err = dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Duration(0)}, "") 595 if err != nil { 596 t.Fatal(err) 597 } 598 if md.DefaultTableExpiration != 0 { 599 t.Fatalf("got %s, want 0", md.DefaultTableExpiration) 600 } 601} 602 603func TestIntegration_DatasetUpdateAccess(t *testing.T) { 604 if client == nil { 605 t.Skip("Integration tests skipped") 606 } 607 ctx := context.Background() 608 md, err := dataset.Metadata(ctx) 609 if err != nil { 610 t.Fatal(err) 611 } 612 origAccess := append([]*AccessEntry(nil), md.Access...) 613 newEntry := &AccessEntry{ 614 Role: ReaderRole, 615 Entity: "Joe@example.com", 616 EntityType: UserEmailEntity, 617 } 618 newAccess := append(md.Access, newEntry) 619 dm := DatasetMetadataToUpdate{Access: newAccess} 620 md, err = dataset.Update(ctx, dm, md.ETag) 621 if err != nil { 622 t.Fatal(err) 623 } 624 defer func() { 625 _, err := dataset.Update(ctx, DatasetMetadataToUpdate{Access: origAccess}, md.ETag) 626 if err != nil { 627 t.Log("could not restore dataset access list") 628 } 629 }() 630 if diff := testutil.Diff(md.Access, newAccess); diff != "" { 631 t.Fatalf("got=-, want=+:\n%s", diff) 632 } 633} 634 635func TestIntegration_DatasetUpdateLabels(t *testing.T) { 636 if client == nil { 637 t.Skip("Integration tests skipped") 638 } 639 ctx := context.Background() 640 _, err := dataset.Metadata(ctx) 641 if err != nil { 642 t.Fatal(err) 643 } 644 var dm DatasetMetadataToUpdate 645 dm.SetLabel("label", "value") 646 md, err := dataset.Update(ctx, dm, "") 647 if err != nil { 648 t.Fatal(err) 649 } 650 if got, want := md.Labels["label"], "value"; got != want { 651 t.Errorf("got %q, want %q", got, want) 652 } 653 dm = DatasetMetadataToUpdate{} 654 dm.DeleteLabel("label") 655 md, err = dataset.Update(ctx, dm, "") 656 if err != nil { 657 t.Fatal(err) 658 } 659 if _, ok := md.Labels["label"]; ok { 660 t.Error("label still present after deletion") 661 } 662} 663 664func TestIntegration_TableUpdateLabels(t *testing.T) { 665 if client == nil { 666 t.Skip("Integration tests skipped") 667 } 668 ctx := context.Background() 669 table := newTable(t, schema) 670 defer table.Delete(ctx) 671 672 var tm TableMetadataToUpdate 673 tm.SetLabel("label", "value") 674 md, err := table.Update(ctx, tm, "") 675 if err != nil { 676 t.Fatal(err) 677 } 678 if got, want := md.Labels["label"], "value"; got != want { 679 t.Errorf("got %q, want %q", got, want) 680 } 681 tm = TableMetadataToUpdate{} 682 tm.DeleteLabel("label") 683 md, err = table.Update(ctx, tm, "") 684 if err != nil { 685 t.Fatal(err) 686 } 687 if _, ok := md.Labels["label"]; ok { 688 t.Error("label still present after deletion") 689 } 690} 691 692func TestIntegration_Tables(t *testing.T) { 693 if client == nil { 694 t.Skip("Integration tests skipped") 695 } 696 ctx := context.Background() 697 table := newTable(t, schema) 698 defer table.Delete(ctx) 699 wantName := table.FullyQualifiedName() 700 701 // This test is flaky due to eventual consistency. 702 ctx, cancel := context.WithTimeout(ctx, 10*time.Second) 703 defer cancel() 704 err := internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) { 705 // Iterate over tables in the dataset. 706 it := dataset.Tables(ctx) 707 var tableNames []string 708 for { 709 tbl, err := it.Next() 710 if err == iterator.Done { 711 break 712 } 713 if err != nil { 714 return false, err 715 } 716 tableNames = append(tableNames, tbl.FullyQualifiedName()) 717 } 718 // Other tests may be running with this dataset, so there might be more 719 // than just our table in the list. So don't try for an exact match; just 720 // make sure that our table is there somewhere. 721 for _, tn := range tableNames { 722 if tn == wantName { 723 return true, nil 724 } 725 } 726 return false, fmt.Errorf("got %v\nwant %s in the list", tableNames, wantName) 727 }) 728 if err != nil { 729 t.Fatal(err) 730 } 731} 732 733func TestIntegration_InsertAndRead(t *testing.T) { 734 if client == nil { 735 t.Skip("Integration tests skipped") 736 } 737 ctx := context.Background() 738 table := newTable(t, schema) 739 defer table.Delete(ctx) 740 741 // Populate the table. 742 ins := table.Inserter() 743 var ( 744 wantRows [][]Value 745 saverRows []*ValuesSaver 746 ) 747 for i, name := range []string{"a", "b", "c"} { 748 row := []Value{name, []Value{int64(i)}, []Value{true}} 749 wantRows = append(wantRows, row) 750 saverRows = append(saverRows, &ValuesSaver{ 751 Schema: schema, 752 InsertID: name, 753 Row: row, 754 }) 755 } 756 if err := ins.Put(ctx, saverRows); err != nil { 757 t.Fatal(putError(err)) 758 } 759 760 // Wait until the data has been uploaded. This can take a few seconds, according 761 // to https://cloud.google.com/bigquery/streaming-data-into-bigquery. 762 if err := waitForRow(ctx, table); err != nil { 763 t.Fatal(err) 764 } 765 // Read the table. 766 checkRead(t, "upload", table.Read(ctx), wantRows) 767 768 // Query the table. 769 q := client.Query(fmt.Sprintf("select name, nums, rec from %s", table.TableID)) 770 q.DefaultProjectID = dataset.ProjectID 771 q.DefaultDatasetID = dataset.DatasetID 772 773 rit, err := q.Read(ctx) 774 if err != nil { 775 t.Fatal(err) 776 } 777 checkRead(t, "query", rit, wantRows) 778 779 // Query the long way. 780 job1, err := q.Run(ctx) 781 if err != nil { 782 t.Fatal(err) 783 } 784 if job1.LastStatus() == nil { 785 t.Error("no LastStatus") 786 } 787 job2, err := client.JobFromID(ctx, job1.ID()) 788 if err != nil { 789 t.Fatal(err) 790 } 791 if job2.LastStatus() == nil { 792 t.Error("no LastStatus") 793 } 794 rit, err = job2.Read(ctx) 795 if err != nil { 796 t.Fatal(err) 797 } 798 checkRead(t, "job.Read", rit, wantRows) 799 800 // Get statistics. 801 jobStatus, err := job2.Status(ctx) 802 if err != nil { 803 t.Fatal(err) 804 } 805 if jobStatus.Statistics == nil { 806 t.Fatal("jobStatus missing statistics") 807 } 808 if _, ok := jobStatus.Statistics.Details.(*QueryStatistics); !ok { 809 t.Errorf("expected QueryStatistics, got %T", jobStatus.Statistics.Details) 810 } 811 812 // Test reading directly into a []Value. 813 valueLists, schema, _, err := readAll(table.Read(ctx)) 814 if err != nil { 815 t.Fatal(err) 816 } 817 it := table.Read(ctx) 818 for i, vl := range valueLists { 819 var got []Value 820 if err := it.Next(&got); err != nil { 821 t.Fatal(err) 822 } 823 if !testutil.Equal(it.Schema, schema) { 824 t.Fatalf("got schema %v, want %v", it.Schema, schema) 825 } 826 want := []Value(vl) 827 if !testutil.Equal(got, want) { 828 t.Errorf("%d: got %v, want %v", i, got, want) 829 } 830 } 831 832 // Test reading into a map. 833 it = table.Read(ctx) 834 for _, vl := range valueLists { 835 var vm map[string]Value 836 if err := it.Next(&vm); err != nil { 837 t.Fatal(err) 838 } 839 if got, want := len(vm), len(vl); got != want { 840 t.Fatalf("valueMap len: got %d, want %d", got, want) 841 } 842 // With maps, structs become nested maps. 843 vl[2] = map[string]Value{"bool": vl[2].([]Value)[0]} 844 for i, v := range vl { 845 if got, want := vm[schema[i].Name], v; !testutil.Equal(got, want) { 846 t.Errorf("%d, name=%s: got %#v, want %#v", 847 i, schema[i].Name, got, want) 848 } 849 } 850 } 851 852} 853 854type SubSubTestStruct struct { 855 Integer int64 856} 857 858type SubTestStruct struct { 859 String string 860 Record SubSubTestStruct 861 RecordArray []SubSubTestStruct 862} 863 864type TestStruct struct { 865 Name string 866 Bytes []byte 867 Integer int64 868 Float float64 869 Boolean bool 870 Timestamp time.Time 871 Date civil.Date 872 Time civil.Time 873 DateTime civil.DateTime 874 Numeric *big.Rat 875 Geography string 876 877 StringArray []string 878 IntegerArray []int64 879 FloatArray []float64 880 BooleanArray []bool 881 TimestampArray []time.Time 882 DateArray []civil.Date 883 TimeArray []civil.Time 884 DateTimeArray []civil.DateTime 885 NumericArray []*big.Rat 886 GeographyArray []string 887 888 Record SubTestStruct 889 RecordArray []SubTestStruct 890} 891 892// Round times to the microsecond for comparison purposes. 893var roundToMicros = cmp.Transformer("RoundToMicros", 894 func(t time.Time) time.Time { return t.Round(time.Microsecond) }) 895 896func TestIntegration_InsertAndReadStructs(t *testing.T) { 897 if client == nil { 898 t.Skip("Integration tests skipped") 899 } 900 schema, err := InferSchema(TestStruct{}) 901 if err != nil { 902 t.Fatal(err) 903 } 904 905 ctx := context.Background() 906 table := newTable(t, schema) 907 defer table.Delete(ctx) 908 909 d := civil.Date{Year: 2016, Month: 3, Day: 20} 910 tm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000} 911 ts := time.Date(2016, 3, 20, 15, 4, 5, 6000, time.UTC) 912 dtm := civil.DateTime{Date: d, Time: tm} 913 d2 := civil.Date{Year: 1994, Month: 5, Day: 15} 914 tm2 := civil.Time{Hour: 1, Minute: 2, Second: 4, Nanosecond: 0} 915 ts2 := time.Date(1994, 5, 15, 1, 2, 4, 0, time.UTC) 916 dtm2 := civil.DateTime{Date: d2, Time: tm2} 917 g := "POINT(-122.350220 47.649154)" 918 g2 := "POINT(-122.0836791 37.421827)" 919 920 // Populate the table. 921 ins := table.Inserter() 922 want := []*TestStruct{ 923 { 924 "a", 925 []byte("byte"), 926 42, 927 3.14, 928 true, 929 ts, 930 d, 931 tm, 932 dtm, 933 big.NewRat(57, 100), 934 g, 935 []string{"a", "b"}, 936 []int64{1, 2}, 937 []float64{1, 1.41}, 938 []bool{true, false}, 939 []time.Time{ts, ts2}, 940 []civil.Date{d, d2}, 941 []civil.Time{tm, tm2}, 942 []civil.DateTime{dtm, dtm2}, 943 []*big.Rat{big.NewRat(1, 2), big.NewRat(3, 5)}, 944 []string{g, g2}, 945 SubTestStruct{ 946 "string", 947 SubSubTestStruct{24}, 948 []SubSubTestStruct{{1}, {2}}, 949 }, 950 []SubTestStruct{ 951 {String: "empty"}, 952 { 953 "full", 954 SubSubTestStruct{1}, 955 []SubSubTestStruct{{1}, {2}}, 956 }, 957 }, 958 }, 959 { 960 Name: "b", 961 Bytes: []byte("byte2"), 962 Integer: 24, 963 Float: 4.13, 964 Boolean: false, 965 Timestamp: ts, 966 Date: d, 967 Time: tm, 968 DateTime: dtm, 969 Numeric: big.NewRat(4499, 10000), 970 }, 971 } 972 var savers []*StructSaver 973 for _, s := range want { 974 savers = append(savers, &StructSaver{Schema: schema, Struct: s}) 975 } 976 if err := ins.Put(ctx, savers); err != nil { 977 t.Fatal(putError(err)) 978 } 979 980 // Wait until the data has been uploaded. This can take a few seconds, according 981 // to https://cloud.google.com/bigquery/streaming-data-into-bigquery. 982 if err := waitForRow(ctx, table); err != nil { 983 t.Fatal(err) 984 } 985 986 // Test iteration with structs. 987 it := table.Read(ctx) 988 var got []*TestStruct 989 for { 990 var g TestStruct 991 err := it.Next(&g) 992 if err == iterator.Done { 993 break 994 } 995 if err != nil { 996 t.Fatal(err) 997 } 998 got = append(got, &g) 999 } 1000 sort.Sort(byName(got)) 1001 1002 // BigQuery does not elide nils. It reports an error for nil fields. 1003 for i, g := range got { 1004 if i >= len(want) { 1005 t.Errorf("%d: got %v, past end of want", i, pretty.Value(g)) 1006 } else if diff := testutil.Diff(g, want[i], roundToMicros); diff != "" { 1007 t.Errorf("%d: got=-, want=+:\n%s", i, diff) 1008 } 1009 } 1010} 1011 1012type byName []*TestStruct 1013 1014func (b byName) Len() int { return len(b) } 1015func (b byName) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 1016func (b byName) Less(i, j int) bool { return b[i].Name < b[j].Name } 1017 1018func TestIntegration_InsertAndReadNullable(t *testing.T) { 1019 if client == nil { 1020 t.Skip("Integration tests skipped") 1021 } 1022 ctm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000} 1023 cdt := civil.DateTime{Date: testDate, Time: ctm} 1024 rat := big.NewRat(33, 100) 1025 geo := "POINT(-122.198939 47.669865)" 1026 1027 // Nil fields in the struct. 1028 testInsertAndReadNullable(t, testStructNullable{}, make([]Value, len(testStructNullableSchema))) 1029 1030 // Explicitly invalidate the Null* types within the struct. 1031 testInsertAndReadNullable(t, testStructNullable{ 1032 String: NullString{Valid: false}, 1033 Integer: NullInt64{Valid: false}, 1034 Float: NullFloat64{Valid: false}, 1035 Boolean: NullBool{Valid: false}, 1036 Timestamp: NullTimestamp{Valid: false}, 1037 Date: NullDate{Valid: false}, 1038 Time: NullTime{Valid: false}, 1039 DateTime: NullDateTime{Valid: false}, 1040 Geography: NullGeography{Valid: false}, 1041 }, 1042 make([]Value, len(testStructNullableSchema))) 1043 1044 // Populate the struct with values. 1045 testInsertAndReadNullable(t, testStructNullable{ 1046 String: NullString{"x", true}, 1047 Bytes: []byte{1, 2, 3}, 1048 Integer: NullInt64{1, true}, 1049 Float: NullFloat64{2.3, true}, 1050 Boolean: NullBool{true, true}, 1051 Timestamp: NullTimestamp{testTimestamp, true}, 1052 Date: NullDate{testDate, true}, 1053 Time: NullTime{ctm, true}, 1054 DateTime: NullDateTime{cdt, true}, 1055 Numeric: rat, 1056 Geography: NullGeography{geo, true}, 1057 Record: &subNullable{X: NullInt64{4, true}}, 1058 }, 1059 []Value{"x", []byte{1, 2, 3}, int64(1), 2.3, true, testTimestamp, testDate, ctm, cdt, rat, geo, []Value{int64(4)}}) 1060} 1061 1062func testInsertAndReadNullable(t *testing.T, ts testStructNullable, wantRow []Value) { 1063 ctx := context.Background() 1064 table := newTable(t, testStructNullableSchema) 1065 defer table.Delete(ctx) 1066 1067 // Populate the table. 1068 ins := table.Inserter() 1069 if err := ins.Put(ctx, []*StructSaver{{Schema: testStructNullableSchema, Struct: ts}}); err != nil { 1070 t.Fatal(putError(err)) 1071 } 1072 // Wait until the data has been uploaded. This can take a few seconds, according 1073 // to https://cloud.google.com/bigquery/streaming-data-into-bigquery. 1074 if err := waitForRow(ctx, table); err != nil { 1075 t.Fatal(err) 1076 } 1077 1078 // Read into a []Value. 1079 iter := table.Read(ctx) 1080 gotRows, _, _, err := readAll(iter) 1081 if err != nil { 1082 t.Fatal(err) 1083 } 1084 if len(gotRows) != 1 { 1085 t.Fatalf("got %d rows, want 1", len(gotRows)) 1086 } 1087 if diff := testutil.Diff(gotRows[0], wantRow, roundToMicros); diff != "" { 1088 t.Error(diff) 1089 } 1090 1091 // Read into a struct. 1092 want := ts 1093 var sn testStructNullable 1094 it := table.Read(ctx) 1095 if err := it.Next(&sn); err != nil { 1096 t.Fatal(err) 1097 } 1098 if diff := testutil.Diff(sn, want, roundToMicros); diff != "" { 1099 t.Error(diff) 1100 } 1101} 1102 1103func TestIntegration_TableUpdate(t *testing.T) { 1104 if client == nil { 1105 t.Skip("Integration tests skipped") 1106 } 1107 ctx := context.Background() 1108 table := newTable(t, schema) 1109 defer table.Delete(ctx) 1110 1111 // Test Update of non-schema fields. 1112 tm, err := table.Metadata(ctx) 1113 if err != nil { 1114 t.Fatal(err) 1115 } 1116 wantDescription := tm.Description + "more" 1117 wantName := tm.Name + "more" 1118 wantExpiration := tm.ExpirationTime.Add(time.Hour * 24) 1119 got, err := table.Update(ctx, TableMetadataToUpdate{ 1120 Description: wantDescription, 1121 Name: wantName, 1122 ExpirationTime: wantExpiration, 1123 }, tm.ETag) 1124 if err != nil { 1125 t.Fatal(err) 1126 } 1127 if got.Description != wantDescription { 1128 t.Errorf("Description: got %q, want %q", got.Description, wantDescription) 1129 } 1130 if got.Name != wantName { 1131 t.Errorf("Name: got %q, want %q", got.Name, wantName) 1132 } 1133 if got.ExpirationTime != wantExpiration { 1134 t.Errorf("ExpirationTime: got %q, want %q", got.ExpirationTime, wantExpiration) 1135 } 1136 if !testutil.Equal(got.Schema, schema) { 1137 t.Errorf("Schema: got %v, want %v", pretty.Value(got.Schema), pretty.Value(schema)) 1138 } 1139 1140 // Blind write succeeds. 1141 _, err = table.Update(ctx, TableMetadataToUpdate{Name: "x"}, "") 1142 if err != nil { 1143 t.Fatal(err) 1144 } 1145 // Write with old etag fails. 1146 _, err = table.Update(ctx, TableMetadataToUpdate{Name: "y"}, got.ETag) 1147 if err == nil { 1148 t.Fatal("Update with old ETag succeeded, wanted failure") 1149 } 1150 1151 // Test schema update. 1152 // Columns can be added. schema2 is the same as schema, except for the 1153 // added column in the middle. 1154 nested := Schema{ 1155 {Name: "nested", Type: BooleanFieldType}, 1156 {Name: "other", Type: StringFieldType}, 1157 } 1158 schema2 := Schema{ 1159 schema[0], 1160 {Name: "rec2", Type: RecordFieldType, Schema: nested}, 1161 schema[1], 1162 schema[2], 1163 } 1164 1165 got, err = table.Update(ctx, TableMetadataToUpdate{Schema: schema2}, "") 1166 if err != nil { 1167 t.Fatal(err) 1168 } 1169 1170 // Wherever you add the column, it appears at the end. 1171 schema3 := Schema{schema2[0], schema2[2], schema2[3], schema2[1]} 1172 if !testutil.Equal(got.Schema, schema3) { 1173 t.Errorf("add field:\ngot %v\nwant %v", 1174 pretty.Value(got.Schema), pretty.Value(schema3)) 1175 } 1176 1177 // Updating with the empty schema succeeds, but is a no-op. 1178 got, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema{}}, "") 1179 if err != nil { 1180 t.Fatal(err) 1181 } 1182 if !testutil.Equal(got.Schema, schema3) { 1183 t.Errorf("empty schema:\ngot %v\nwant %v", 1184 pretty.Value(got.Schema), pretty.Value(schema3)) 1185 } 1186 1187 // Error cases when updating schema. 1188 for _, test := range []struct { 1189 desc string 1190 fields Schema 1191 }{ 1192 {"change from optional to required", Schema{ 1193 {Name: "name", Type: StringFieldType, Required: true}, 1194 schema3[1], 1195 schema3[2], 1196 schema3[3], 1197 }}, 1198 {"add a required field", Schema{ 1199 schema3[0], schema3[1], schema3[2], schema3[3], 1200 {Name: "req", Type: StringFieldType, Required: true}, 1201 }}, 1202 {"remove a field", Schema{schema3[0], schema3[1], schema3[2]}}, 1203 {"remove a nested field", Schema{ 1204 schema3[0], schema3[1], schema3[2], 1205 {Name: "rec2", Type: RecordFieldType, Schema: Schema{nested[0]}}}}, 1206 {"remove all nested fields", Schema{ 1207 schema3[0], schema3[1], schema3[2], 1208 {Name: "rec2", Type: RecordFieldType, Schema: Schema{}}}}, 1209 } { 1210 _, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema(test.fields)}, "") 1211 if err == nil { 1212 t.Errorf("%s: want error, got nil", test.desc) 1213 } else if !hasStatusCode(err, 400) { 1214 t.Errorf("%s: want 400, got %v", test.desc, err) 1215 } 1216 } 1217} 1218 1219func TestIntegration_Load(t *testing.T) { 1220 if client == nil { 1221 t.Skip("Integration tests skipped") 1222 } 1223 ctx := context.Background() 1224 // CSV data can't be loaded into a repeated field, so we use a different schema. 1225 table := newTable(t, Schema{ 1226 {Name: "name", Type: StringFieldType}, 1227 {Name: "nums", Type: IntegerFieldType}, 1228 }) 1229 defer table.Delete(ctx) 1230 1231 // Load the table from a reader. 1232 r := strings.NewReader("a,0\nb,1\nc,2\n") 1233 wantRows := [][]Value{ 1234 {"a", int64(0)}, 1235 {"b", int64(1)}, 1236 {"c", int64(2)}, 1237 } 1238 rs := NewReaderSource(r) 1239 loader := table.LoaderFrom(rs) 1240 loader.WriteDisposition = WriteTruncate 1241 loader.Labels = map[string]string{"test": "go"} 1242 job, err := loader.Run(ctx) 1243 if err != nil { 1244 t.Fatal(err) 1245 } 1246 if job.LastStatus() == nil { 1247 t.Error("no LastStatus") 1248 } 1249 conf, err := job.Config() 1250 if err != nil { 1251 t.Fatal(err) 1252 } 1253 config, ok := conf.(*LoadConfig) 1254 if !ok { 1255 t.Fatalf("got %T, want LoadConfig", conf) 1256 } 1257 diff := testutil.Diff(config, &loader.LoadConfig, 1258 cmp.AllowUnexported(Table{}), 1259 cmpopts.IgnoreUnexported(Client{}, ReaderSource{}), 1260 // returned schema is at top level, not in the config 1261 cmpopts.IgnoreFields(FileConfig{}, "Schema")) 1262 if diff != "" { 1263 t.Errorf("got=-, want=+:\n%s", diff) 1264 } 1265 if err := wait(ctx, job); err != nil { 1266 t.Fatal(err) 1267 } 1268 checkReadAndTotalRows(t, "reader load", table.Read(ctx), wantRows) 1269 1270} 1271 1272func TestIntegration_DML(t *testing.T) { 1273 if client == nil { 1274 t.Skip("Integration tests skipped") 1275 } 1276 ctx := context.Background() 1277 table := newTable(t, schema) 1278 defer table.Delete(ctx) 1279 1280 sql := fmt.Sprintf(`INSERT %s.%s (name, nums, rec) 1281 VALUES ('a', [0], STRUCT<BOOL>(TRUE)), 1282 ('b', [1], STRUCT<BOOL>(FALSE)), 1283 ('c', [2], STRUCT<BOOL>(TRUE))`, 1284 table.DatasetID, table.TableID) 1285 if err := runQueryJob(ctx, sql); err != nil { 1286 t.Fatal(err) 1287 } 1288 wantRows := [][]Value{ 1289 {"a", []Value{int64(0)}, []Value{true}}, 1290 {"b", []Value{int64(1)}, []Value{false}}, 1291 {"c", []Value{int64(2)}, []Value{true}}, 1292 } 1293 checkRead(t, "DML", table.Read(ctx), wantRows) 1294} 1295 1296// runQueryJob is useful for running queries where no row data is returned (DDL/DML). 1297func runQueryJob(ctx context.Context, sql string) error { 1298 return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) { 1299 job, err := client.Query(sql).Run(ctx) 1300 if err != nil { 1301 if e, ok := err.(*googleapi.Error); ok && e.Code < 500 { 1302 return true, err // fail on 4xx 1303 } 1304 return false, err 1305 } 1306 _, err = job.Wait(ctx) 1307 if err != nil { 1308 if e, ok := err.(*googleapi.Error); ok && e.Code < 500 { 1309 return true, err // fail on 4xx 1310 } 1311 return false, err 1312 } 1313 return true, nil 1314 }) 1315} 1316 1317func TestIntegration_TimeTypes(t *testing.T) { 1318 if client == nil { 1319 t.Skip("Integration tests skipped") 1320 } 1321 ctx := context.Background() 1322 dtSchema := Schema{ 1323 {Name: "d", Type: DateFieldType}, 1324 {Name: "t", Type: TimeFieldType}, 1325 {Name: "dt", Type: DateTimeFieldType}, 1326 {Name: "ts", Type: TimestampFieldType}, 1327 } 1328 table := newTable(t, dtSchema) 1329 defer table.Delete(ctx) 1330 1331 d := civil.Date{Year: 2016, Month: 3, Day: 20} 1332 tm := civil.Time{Hour: 12, Minute: 30, Second: 0, Nanosecond: 6000} 1333 dtm := civil.DateTime{Date: d, Time: tm} 1334 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC) 1335 wantRows := [][]Value{ 1336 {d, tm, dtm, ts}, 1337 } 1338 ins := table.Inserter() 1339 if err := ins.Put(ctx, []*ValuesSaver{ 1340 {Schema: dtSchema, Row: wantRows[0]}, 1341 }); err != nil { 1342 t.Fatal(putError(err)) 1343 } 1344 if err := waitForRow(ctx, table); err != nil { 1345 t.Fatal(err) 1346 } 1347 1348 // SQL wants DATETIMEs with a space between date and time, but the service 1349 // returns them in RFC3339 form, with a "T" between. 1350 query := fmt.Sprintf("INSERT %s.%s (d, t, dt, ts) "+ 1351 "VALUES ('%s', '%s', '%s', '%s')", 1352 table.DatasetID, table.TableID, 1353 d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05")) 1354 if err := runQueryJob(ctx, query); err != nil { 1355 t.Fatal(err) 1356 } 1357 wantRows = append(wantRows, wantRows[0]) 1358 checkRead(t, "TimeTypes", table.Read(ctx), wantRows) 1359} 1360 1361func TestIntegration_StandardQuery(t *testing.T) { 1362 if client == nil { 1363 t.Skip("Integration tests skipped") 1364 } 1365 ctx := context.Background() 1366 1367 d := civil.Date{Year: 2016, Month: 3, Day: 20} 1368 tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 0} 1369 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC) 1370 dtm := ts.Format("2006-01-02 15:04:05") 1371 1372 // Constructs Value slices made up of int64s. 1373 ints := func(args ...int) []Value { 1374 vals := make([]Value, len(args)) 1375 for i, arg := range args { 1376 vals[i] = int64(arg) 1377 } 1378 return vals 1379 } 1380 1381 testCases := []struct { 1382 query string 1383 wantRow []Value 1384 }{ 1385 {"SELECT 1", ints(1)}, 1386 {"SELECT 1.3", []Value{1.3}}, 1387 {"SELECT CAST(1.3 AS NUMERIC)", []Value{big.NewRat(13, 10)}}, 1388 {"SELECT NUMERIC '0.25'", []Value{big.NewRat(1, 4)}}, 1389 {"SELECT TRUE", []Value{true}}, 1390 {"SELECT 'ABC'", []Value{"ABC"}}, 1391 {"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}}, 1392 {fmt.Sprintf("SELECT TIMESTAMP '%s'", dtm), []Value{ts}}, 1393 {fmt.Sprintf("SELECT [TIMESTAMP '%s', TIMESTAMP '%s']", dtm, dtm), []Value{[]Value{ts, ts}}}, 1394 {fmt.Sprintf("SELECT ('hello', TIMESTAMP '%s')", dtm), []Value{[]Value{"hello", ts}}}, 1395 {fmt.Sprintf("SELECT DATETIME(TIMESTAMP '%s')", dtm), []Value{civil.DateTime{Date: d, Time: tm}}}, 1396 {fmt.Sprintf("SELECT DATE(TIMESTAMP '%s')", dtm), []Value{d}}, 1397 {fmt.Sprintf("SELECT TIME(TIMESTAMP '%s')", dtm), []Value{tm}}, 1398 {"SELECT (1, 2)", []Value{ints(1, 2)}}, 1399 {"SELECT [1, 2, 3]", []Value{ints(1, 2, 3)}}, 1400 {"SELECT ([1, 2], 3, [4, 5])", []Value{[]Value{ints(1, 2), int64(3), ints(4, 5)}}}, 1401 {"SELECT [(1, 2, 3), (4, 5, 6)]", []Value{[]Value{ints(1, 2, 3), ints(4, 5, 6)}}}, 1402 {"SELECT [([1, 2, 3], 4), ([5, 6], 7)]", []Value{[]Value{[]Value{ints(1, 2, 3), int64(4)}, []Value{ints(5, 6), int64(7)}}}}, 1403 {"SELECT ARRAY(SELECT STRUCT([1, 2]))", []Value{[]Value{[]Value{ints(1, 2)}}}}, 1404 } 1405 for _, c := range testCases { 1406 q := client.Query(c.query) 1407 it, err := q.Read(ctx) 1408 if err != nil { 1409 t.Fatal(err) 1410 } 1411 checkRead(t, "StandardQuery", it, [][]Value{c.wantRow}) 1412 } 1413} 1414 1415func TestIntegration_LegacyQuery(t *testing.T) { 1416 if client == nil { 1417 t.Skip("Integration tests skipped") 1418 } 1419 ctx := context.Background() 1420 1421 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC) 1422 dtm := ts.Format("2006-01-02 15:04:05") 1423 1424 testCases := []struct { 1425 query string 1426 wantRow []Value 1427 }{ 1428 {"SELECT 1", []Value{int64(1)}}, 1429 {"SELECT 1.3", []Value{1.3}}, 1430 {"SELECT TRUE", []Value{true}}, 1431 {"SELECT 'ABC'", []Value{"ABC"}}, 1432 {"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}}, 1433 {fmt.Sprintf("SELECT TIMESTAMP('%s')", dtm), []Value{ts}}, 1434 {fmt.Sprintf("SELECT DATE(TIMESTAMP('%s'))", dtm), []Value{"2016-03-20"}}, 1435 {fmt.Sprintf("SELECT TIME(TIMESTAMP('%s'))", dtm), []Value{"15:04:05"}}, 1436 } 1437 for _, c := range testCases { 1438 q := client.Query(c.query) 1439 q.UseLegacySQL = true 1440 it, err := q.Read(ctx) 1441 if err != nil { 1442 t.Fatal(err) 1443 } 1444 checkRead(t, "LegacyQuery", it, [][]Value{c.wantRow}) 1445 } 1446} 1447 1448func TestIntegration_QueryParameters(t *testing.T) { 1449 if client == nil { 1450 t.Skip("Integration tests skipped") 1451 } 1452 ctx := context.Background() 1453 1454 d := civil.Date{Year: 2016, Month: 3, Day: 20} 1455 tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 3008} 1456 rtm := tm 1457 rtm.Nanosecond = 3000 // round to microseconds 1458 dtm := civil.DateTime{Date: d, Time: tm} 1459 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC) 1460 rat := big.NewRat(13, 10) 1461 1462 type ss struct { 1463 String string 1464 } 1465 1466 type s struct { 1467 Timestamp time.Time 1468 StringArray []string 1469 SubStruct ss 1470 SubStructArray []ss 1471 } 1472 1473 testCases := []struct { 1474 query string 1475 parameters []QueryParameter 1476 wantRow []Value 1477 wantConfig interface{} 1478 }{ 1479 { 1480 "SELECT @val", 1481 []QueryParameter{{"val", 1}}, 1482 []Value{int64(1)}, 1483 int64(1), 1484 }, 1485 { 1486 "SELECT @val", 1487 []QueryParameter{{"val", 1.3}}, 1488 []Value{1.3}, 1489 1.3, 1490 }, 1491 { 1492 "SELECT @val", 1493 []QueryParameter{{"val", rat}}, 1494 []Value{rat}, 1495 rat, 1496 }, 1497 { 1498 "SELECT @val", 1499 []QueryParameter{{"val", true}}, 1500 []Value{true}, 1501 true, 1502 }, 1503 { 1504 "SELECT @val", 1505 []QueryParameter{{"val", "ABC"}}, 1506 []Value{"ABC"}, 1507 "ABC", 1508 }, 1509 { 1510 "SELECT @val", 1511 []QueryParameter{{"val", []byte("foo")}}, 1512 []Value{[]byte("foo")}, 1513 []byte("foo"), 1514 }, 1515 { 1516 "SELECT @val", 1517 []QueryParameter{{"val", ts}}, 1518 []Value{ts}, 1519 ts, 1520 }, 1521 { 1522 "SELECT @val", 1523 []QueryParameter{{"val", []time.Time{ts, ts}}}, 1524 []Value{[]Value{ts, ts}}, 1525 []interface{}{ts, ts}, 1526 }, 1527 { 1528 "SELECT @val", 1529 []QueryParameter{{"val", dtm}}, 1530 []Value{civil.DateTime{Date: d, Time: rtm}}, 1531 civil.DateTime{Date: d, Time: rtm}, 1532 }, 1533 { 1534 "SELECT @val", 1535 []QueryParameter{{"val", d}}, 1536 []Value{d}, 1537 d, 1538 }, 1539 { 1540 "SELECT @val", 1541 []QueryParameter{{"val", tm}}, 1542 []Value{rtm}, 1543 rtm, 1544 }, 1545 { 1546 "SELECT @val", 1547 []QueryParameter{{"val", s{ts, []string{"a", "b"}, ss{"c"}, []ss{{"d"}, {"e"}}}}}, 1548 []Value{[]Value{ts, []Value{"a", "b"}, []Value{"c"}, []Value{[]Value{"d"}, []Value{"e"}}}}, 1549 map[string]interface{}{ 1550 "Timestamp": ts, 1551 "StringArray": []interface{}{"a", "b"}, 1552 "SubStruct": map[string]interface{}{"String": "c"}, 1553 "SubStructArray": []interface{}{ 1554 map[string]interface{}{"String": "d"}, 1555 map[string]interface{}{"String": "e"}, 1556 }, 1557 }, 1558 }, 1559 { 1560 "SELECT @val.Timestamp, @val.SubStruct.String", 1561 []QueryParameter{{"val", s{Timestamp: ts, SubStruct: ss{"a"}}}}, 1562 []Value{ts, "a"}, 1563 map[string]interface{}{ 1564 "Timestamp": ts, 1565 "SubStruct": map[string]interface{}{"String": "a"}, 1566 "StringArray": nil, 1567 "SubStructArray": nil, 1568 }, 1569 }, 1570 } 1571 for _, c := range testCases { 1572 q := client.Query(c.query) 1573 q.Parameters = c.parameters 1574 job, err := q.Run(ctx) 1575 if err != nil { 1576 t.Fatal(err) 1577 } 1578 if job.LastStatus() == nil { 1579 t.Error("no LastStatus") 1580 } 1581 it, err := job.Read(ctx) 1582 if err != nil { 1583 t.Fatal(err) 1584 } 1585 checkRead(t, "QueryParameters", it, [][]Value{c.wantRow}) 1586 config, err := job.Config() 1587 if err != nil { 1588 t.Fatal(err) 1589 } 1590 got := config.(*QueryConfig).Parameters[0].Value 1591 if !testutil.Equal(got, c.wantConfig) { 1592 t.Errorf("param %[1]v (%[1]T): config:\ngot %[2]v (%[2]T)\nwant %[3]v (%[3]T)", 1593 c.parameters[0].Value, got, c.wantConfig) 1594 } 1595 } 1596} 1597 1598func TestIntegration_QueryDryRun(t *testing.T) { 1599 if client == nil { 1600 t.Skip("Integration tests skipped") 1601 } 1602 ctx := context.Background() 1603 q := client.Query("SELECT word from " + stdName + " LIMIT 10") 1604 q.DryRun = true 1605 job, err := q.Run(ctx) 1606 if err != nil { 1607 t.Fatal(err) 1608 } 1609 1610 s := job.LastStatus() 1611 if s.State != Done { 1612 t.Errorf("state is %v, expected Done", s.State) 1613 } 1614 if s.Statistics == nil { 1615 t.Fatal("no statistics") 1616 } 1617 if s.Statistics.Details.(*QueryStatistics).Schema == nil { 1618 t.Fatal("no schema") 1619 } 1620 if s.Statistics.Details.(*QueryStatistics).TotalBytesProcessedAccuracy == "" { 1621 t.Fatal("no cost accuracy") 1622 } 1623} 1624 1625func TestIntegration_ExtractExternal(t *testing.T) { 1626 // Create a table, extract it to GCS, then query it externally. 1627 if client == nil { 1628 t.Skip("Integration tests skipped") 1629 } 1630 ctx := context.Background() 1631 schema := Schema{ 1632 {Name: "name", Type: StringFieldType}, 1633 {Name: "num", Type: IntegerFieldType}, 1634 } 1635 table := newTable(t, schema) 1636 defer table.Delete(ctx) 1637 1638 // Insert table data. 1639 sql := fmt.Sprintf(`INSERT %s.%s (name, num) 1640 VALUES ('a', 1), ('b', 2), ('c', 3)`, 1641 table.DatasetID, table.TableID) 1642 if err := runQueryJob(ctx, sql); err != nil { 1643 t.Fatal(err) 1644 } 1645 // Extract to a GCS object as CSV. 1646 bucketName := testutil.ProjID() 1647 objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID) 1648 uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName) 1649 defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx) 1650 gr := NewGCSReference(uri) 1651 gr.DestinationFormat = CSV 1652 e := table.ExtractorTo(gr) 1653 job, err := e.Run(ctx) 1654 if err != nil { 1655 t.Fatal(err) 1656 } 1657 conf, err := job.Config() 1658 if err != nil { 1659 t.Fatal(err) 1660 } 1661 config, ok := conf.(*ExtractConfig) 1662 if !ok { 1663 t.Fatalf("got %T, want ExtractConfig", conf) 1664 } 1665 diff := testutil.Diff(config, &e.ExtractConfig, 1666 cmp.AllowUnexported(Table{}), 1667 cmpopts.IgnoreUnexported(Client{})) 1668 if diff != "" { 1669 t.Errorf("got=-, want=+:\n%s", diff) 1670 } 1671 if err := wait(ctx, job); err != nil { 1672 t.Fatal(err) 1673 } 1674 1675 edc := &ExternalDataConfig{ 1676 SourceFormat: CSV, 1677 SourceURIs: []string{uri}, 1678 Schema: schema, 1679 Options: &CSVOptions{ 1680 SkipLeadingRows: 1, 1681 // This is the default. Since we use edc as an expectation later on, 1682 // let's just be explicit. 1683 FieldDelimiter: ",", 1684 }, 1685 } 1686 // Query that CSV file directly. 1687 q := client.Query("SELECT * FROM csv") 1688 q.TableDefinitions = map[string]ExternalData{"csv": edc} 1689 wantRows := [][]Value{ 1690 {"a", int64(1)}, 1691 {"b", int64(2)}, 1692 {"c", int64(3)}, 1693 } 1694 iter, err := q.Read(ctx) 1695 if err != nil { 1696 t.Fatal(err) 1697 } 1698 checkReadAndTotalRows(t, "external query", iter, wantRows) 1699 1700 // Make a table pointing to the file, and query it. 1701 // BigQuery does not allow a Table.Read on an external table. 1702 table = dataset.Table(tableIDs.New()) 1703 err = table.Create(context.Background(), &TableMetadata{ 1704 Schema: schema, 1705 ExpirationTime: testTableExpiration, 1706 ExternalDataConfig: edc, 1707 }) 1708 if err != nil { 1709 t.Fatal(err) 1710 } 1711 q = client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID)) 1712 iter, err = q.Read(ctx) 1713 if err != nil { 1714 t.Fatal(err) 1715 } 1716 checkReadAndTotalRows(t, "external table", iter, wantRows) 1717 1718 // While we're here, check that the table metadata is correct. 1719 md, err := table.Metadata(ctx) 1720 if err != nil { 1721 t.Fatal(err) 1722 } 1723 // One difference: since BigQuery returns the schema as part of the ordinary 1724 // table metadata, it does not populate ExternalDataConfig.Schema. 1725 md.ExternalDataConfig.Schema = md.Schema 1726 if diff := testutil.Diff(md.ExternalDataConfig, edc); diff != "" { 1727 t.Errorf("got=-, want=+\n%s", diff) 1728 } 1729} 1730 1731func TestIntegration_ReadNullIntoStruct(t *testing.T) { 1732 // Reading a null into a struct field should return an error (not panic). 1733 if client == nil { 1734 t.Skip("Integration tests skipped") 1735 } 1736 ctx := context.Background() 1737 table := newTable(t, schema) 1738 defer table.Delete(ctx) 1739 1740 ins := table.Inserter() 1741 row := &ValuesSaver{ 1742 Schema: schema, 1743 Row: []Value{nil, []Value{}, []Value{nil}}, 1744 } 1745 if err := ins.Put(ctx, []*ValuesSaver{row}); err != nil { 1746 t.Fatal(putError(err)) 1747 } 1748 if err := waitForRow(ctx, table); err != nil { 1749 t.Fatal(err) 1750 } 1751 1752 q := client.Query(fmt.Sprintf("select name from %s", table.TableID)) 1753 q.DefaultProjectID = dataset.ProjectID 1754 q.DefaultDatasetID = dataset.DatasetID 1755 it, err := q.Read(ctx) 1756 if err != nil { 1757 t.Fatal(err) 1758 } 1759 type S struct{ Name string } 1760 var s S 1761 if err := it.Next(&s); err == nil { 1762 t.Fatal("got nil, want error") 1763 } 1764} 1765 1766const ( 1767 stdName = "`bigquery-public-data.samples.shakespeare`" 1768 legacyName = "[bigquery-public-data:samples.shakespeare]" 1769) 1770 1771// These tests exploit the fact that the two SQL versions have different syntaxes for 1772// fully-qualified table names. 1773var useLegacySQLTests = []struct { 1774 t string // name of table 1775 std, legacy bool // use standard/legacy SQL 1776 err bool // do we expect an error? 1777}{ 1778 {t: legacyName, std: false, legacy: true, err: false}, 1779 {t: legacyName, std: true, legacy: false, err: true}, 1780 {t: legacyName, std: false, legacy: false, err: true}, // standard SQL is default 1781 {t: legacyName, std: true, legacy: true, err: true}, 1782 {t: stdName, std: false, legacy: true, err: true}, 1783 {t: stdName, std: true, legacy: false, err: false}, 1784 {t: stdName, std: false, legacy: false, err: false}, // standard SQL is default 1785 {t: stdName, std: true, legacy: true, err: true}, 1786} 1787 1788func TestIntegration_QueryUseLegacySQL(t *testing.T) { 1789 // Test the UseLegacySQL and UseStandardSQL options for queries. 1790 if client == nil { 1791 t.Skip("Integration tests skipped") 1792 } 1793 ctx := context.Background() 1794 for _, test := range useLegacySQLTests { 1795 q := client.Query(fmt.Sprintf("select word from %s limit 1", test.t)) 1796 q.UseStandardSQL = test.std 1797 q.UseLegacySQL = test.legacy 1798 _, err := q.Read(ctx) 1799 gotErr := err != nil 1800 if gotErr && !test.err { 1801 t.Errorf("%+v:\nunexpected error: %v", test, err) 1802 } else if !gotErr && test.err { 1803 t.Errorf("%+v:\nsucceeded, but want error", test) 1804 } 1805 } 1806} 1807 1808func TestIntegration_TableUseLegacySQL(t *testing.T) { 1809 // Test UseLegacySQL and UseStandardSQL for Table.Create. 1810 if client == nil { 1811 t.Skip("Integration tests skipped") 1812 } 1813 ctx := context.Background() 1814 table := newTable(t, schema) 1815 defer table.Delete(ctx) 1816 for i, test := range useLegacySQLTests { 1817 view := dataset.Table(fmt.Sprintf("t_view_%d", i)) 1818 tm := &TableMetadata{ 1819 ViewQuery: fmt.Sprintf("SELECT word from %s", test.t), 1820 UseStandardSQL: test.std, 1821 UseLegacySQL: test.legacy, 1822 } 1823 err := view.Create(ctx, tm) 1824 gotErr := err != nil 1825 if gotErr && !test.err { 1826 t.Errorf("%+v:\nunexpected error: %v", test, err) 1827 } else if !gotErr && test.err { 1828 t.Errorf("%+v:\nsucceeded, but want error", test) 1829 } 1830 _ = view.Delete(ctx) 1831 } 1832} 1833 1834func TestIntegration_ListJobs(t *testing.T) { 1835 // It's difficult to test the list of jobs, because we can't easily 1836 // control what's in it. Also, there are many jobs in the test project, 1837 // and it takes considerable time to list them all. 1838 if client == nil { 1839 t.Skip("Integration tests skipped") 1840 } 1841 ctx := context.Background() 1842 1843 // About all we can do is list a few jobs. 1844 const max = 20 1845 var jobs []*Job 1846 it := client.Jobs(ctx) 1847 for { 1848 job, err := it.Next() 1849 if err == iterator.Done { 1850 break 1851 } 1852 if err != nil { 1853 t.Fatal(err) 1854 } 1855 jobs = append(jobs, job) 1856 if len(jobs) >= max { 1857 break 1858 } 1859 } 1860 // We expect that there is at least one job in the last few months. 1861 if len(jobs) == 0 { 1862 t.Fatal("did not get any jobs") 1863 } 1864} 1865 1866const tokyo = "asia-northeast1" 1867 1868func TestIntegration_Location(t *testing.T) { 1869 if client == nil { 1870 t.Skip("Integration tests skipped") 1871 } 1872 client.Location = "" 1873 testLocation(t, tokyo) 1874 client.Location = tokyo 1875 defer func() { 1876 client.Location = "" 1877 }() 1878 testLocation(t, "") 1879} 1880 1881func testLocation(t *testing.T, loc string) { 1882 ctx := context.Background() 1883 tokyoDataset := client.Dataset("tokyo") 1884 err := tokyoDataset.Create(ctx, &DatasetMetadata{Location: loc}) 1885 if err != nil && !hasStatusCode(err, 409) { // 409 = already exists 1886 t.Fatal(err) 1887 } 1888 md, err := tokyoDataset.Metadata(ctx) 1889 if err != nil { 1890 t.Fatal(err) 1891 } 1892 if md.Location != tokyo { 1893 t.Fatalf("dataset location: got %s, want %s", md.Location, tokyo) 1894 } 1895 table := tokyoDataset.Table(tableIDs.New()) 1896 err = table.Create(context.Background(), &TableMetadata{ 1897 Schema: Schema{ 1898 {Name: "name", Type: StringFieldType}, 1899 {Name: "nums", Type: IntegerFieldType}, 1900 }, 1901 ExpirationTime: testTableExpiration, 1902 }) 1903 if err != nil { 1904 t.Fatal(err) 1905 } 1906 defer table.Delete(ctx) 1907 loader := table.LoaderFrom(NewReaderSource(strings.NewReader("a,0\nb,1\nc,2\n"))) 1908 loader.Location = loc 1909 job, err := loader.Run(ctx) 1910 if err != nil { 1911 t.Fatal("loader.Run", err) 1912 } 1913 if job.Location() != tokyo { 1914 t.Fatalf("job location: got %s, want %s", job.Location(), tokyo) 1915 } 1916 _, err = client.JobFromID(ctx, job.ID()) 1917 if client.Location == "" && err == nil { 1918 t.Error("JobFromID with Tokyo job, no client location: want error, got nil") 1919 } 1920 if client.Location != "" && err != nil { 1921 t.Errorf("JobFromID with Tokyo job, with client location: want nil, got %v", err) 1922 } 1923 _, err = client.JobFromIDLocation(ctx, job.ID(), "US") 1924 if err == nil { 1925 t.Error("JobFromIDLocation with US: want error, got nil") 1926 } 1927 job2, err := client.JobFromIDLocation(ctx, job.ID(), loc) 1928 if loc == tokyo && err != nil { 1929 t.Errorf("loc=tokyo: %v", err) 1930 } 1931 if loc == "" && err == nil { 1932 t.Error("loc empty: got nil, want error") 1933 } 1934 if job2 != nil && (job2.ID() != job.ID() || job2.Location() != tokyo) { 1935 t.Errorf("got id %s loc %s, want id%s loc %s", job2.ID(), job2.Location(), job.ID(), tokyo) 1936 } 1937 if err := wait(ctx, job); err != nil { 1938 t.Fatal(err) 1939 } 1940 // Cancel should succeed even if the job is done. 1941 if err := job.Cancel(ctx); err != nil { 1942 t.Fatal(err) 1943 } 1944 1945 q := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID)) 1946 q.Location = loc 1947 iter, err := q.Read(ctx) 1948 if err != nil { 1949 t.Fatal(err) 1950 } 1951 wantRows := [][]Value{ 1952 {"a", int64(0)}, 1953 {"b", int64(1)}, 1954 {"c", int64(2)}, 1955 } 1956 checkRead(t, "location", iter, wantRows) 1957 1958 table2 := tokyoDataset.Table(tableIDs.New()) 1959 copier := table2.CopierFrom(table) 1960 copier.Location = loc 1961 if _, err := copier.Run(ctx); err != nil { 1962 t.Fatal(err) 1963 } 1964 bucketName := testutil.ProjID() 1965 objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID) 1966 uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName) 1967 defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx) 1968 gr := NewGCSReference(uri) 1969 gr.DestinationFormat = CSV 1970 e := table.ExtractorTo(gr) 1971 e.Location = loc 1972 if _, err := e.Run(ctx); err != nil { 1973 t.Fatal(err) 1974 } 1975} 1976 1977func TestIntegration_NumericErrors(t *testing.T) { 1978 // Verify that the service returns an error for a big.Rat that's too large. 1979 if client == nil { 1980 t.Skip("Integration tests skipped") 1981 } 1982 ctx := context.Background() 1983 schema := Schema{{Name: "n", Type: NumericFieldType}} 1984 table := newTable(t, schema) 1985 defer table.Delete(ctx) 1986 tooBigRat := &big.Rat{} 1987 if _, ok := tooBigRat.SetString("1e40"); !ok { 1988 t.Fatal("big.Rat.SetString failed") 1989 } 1990 ins := table.Inserter() 1991 err := ins.Put(ctx, []*ValuesSaver{{Schema: schema, Row: []Value{tooBigRat}}}) 1992 if err == nil { 1993 t.Fatal("got nil, want error") 1994 } 1995} 1996 1997func TestIntegration_QueryErrors(t *testing.T) { 1998 // Verify that a bad query returns an appropriate error. 1999 if client == nil { 2000 t.Skip("Integration tests skipped") 2001 } 2002 ctx := context.Background() 2003 q := client.Query("blah blah broken") 2004 _, err := q.Read(ctx) 2005 const want = "invalidQuery" 2006 if !strings.Contains(err.Error(), want) { 2007 t.Fatalf("got %q, want substring %q", err, want) 2008 } 2009} 2010 2011func TestIntegration_ModelLifecycle(t *testing.T) { 2012 if client == nil { 2013 t.Skip("Integration tests skipped") 2014 } 2015 ctx := context.Background() 2016 2017 // Create a model via a CREATE MODEL query 2018 modelID := modelIDs.New() 2019 model := dataset.Model(modelID) 2020 modelRef := fmt.Sprintf("%s.%s.%s", dataset.ProjectID, dataset.DatasetID, modelID) 2021 2022 sql := fmt.Sprintf(` 2023 CREATE MODEL `+"`%s`"+` 2024 OPTIONS ( 2025 model_type='linear_reg', 2026 max_iteration=1, 2027 learn_rate=0.4, 2028 learn_rate_strategy='constant' 2029 ) AS ( 2030 SELECT 'a' AS f1, 2.0 AS label 2031 UNION ALL 2032 SELECT 'b' AS f1, 3.8 AS label 2033 )`, modelRef) 2034 if err := runQueryJob(ctx, sql); err != nil { 2035 t.Fatal(err) 2036 } 2037 defer model.Delete(ctx) 2038 2039 // Get the model metadata. 2040 curMeta, err := model.Metadata(ctx) 2041 if err != nil { 2042 t.Fatalf("couldn't get metadata: %v", err) 2043 } 2044 2045 want := "LINEAR_REGRESSION" 2046 if curMeta.Type != want { 2047 t.Errorf("Model type mismatch. Want %s got %s", curMeta.Type, want) 2048 } 2049 2050 // Ensure training metadata is available. 2051 runs := curMeta.RawTrainingRuns() 2052 if runs == nil { 2053 t.Errorf("training runs unpopulated.") 2054 } 2055 labelCols, err := curMeta.RawLabelColumns() 2056 if err != nil { 2057 t.Fatalf("failed to get label cols: %v", err) 2058 } 2059 if labelCols == nil { 2060 t.Errorf("label column information unpopulated.") 2061 } 2062 featureCols, err := curMeta.RawFeatureColumns() 2063 if err != nil { 2064 t.Fatalf("failed to get feature cols: %v", err) 2065 } 2066 if featureCols == nil { 2067 t.Errorf("feature column information unpopulated.") 2068 } 2069 2070 // Update mutable fields via API. 2071 expiry := time.Now().Add(24 * time.Hour).Truncate(time.Millisecond) 2072 2073 upd := ModelMetadataToUpdate{ 2074 Description: "new", 2075 Name: "friendly", 2076 ExpirationTime: expiry, 2077 } 2078 2079 newMeta, err := model.Update(ctx, upd, curMeta.ETag) 2080 if err != nil { 2081 t.Fatalf("failed to update: %v", err) 2082 } 2083 2084 want = "new" 2085 if newMeta.Description != want { 2086 t.Fatalf("Description not updated. got %s want %s", newMeta.Description, want) 2087 } 2088 want = "friendly" 2089 if newMeta.Name != want { 2090 t.Fatalf("Description not updated. got %s want %s", newMeta.Description, want) 2091 } 2092 if newMeta.ExpirationTime != expiry { 2093 t.Fatalf("ExpirationTime not updated. got %v want %v", newMeta.ExpirationTime, expiry) 2094 } 2095 2096 // Ensure presence when enumerating the model list. 2097 it := dataset.Models(ctx) 2098 seen := false 2099 for { 2100 mdl, err := it.Next() 2101 if err == iterator.Done { 2102 break 2103 } 2104 if err != nil { 2105 t.Fatal(err) 2106 } 2107 if mdl.ModelID == modelID { 2108 seen = true 2109 } 2110 } 2111 if !seen { 2112 t.Fatal("model not listed in dataset") 2113 } 2114 2115 // Delete the model. 2116 if err := model.Delete(ctx); err != nil { 2117 t.Fatalf("failed to delete model: %v", err) 2118 } 2119} 2120 2121func TestIntegration_RoutineScalarUDF(t *testing.T) { 2122 if client == nil { 2123 t.Skip("Integration tests skipped") 2124 } 2125 ctx := context.Background() 2126 2127 // Create a scalar UDF routine via API. 2128 routineID := routineIDs.New() 2129 routine := dataset.Routine(routineID) 2130 err := routine.Create(ctx, &RoutineMetadata{ 2131 Type: "SCALAR_FUNCTION", 2132 Language: "SQL", 2133 Body: "x * 3", 2134 Arguments: []*RoutineArgument{ 2135 { 2136 Name: "x", 2137 DataType: &StandardSQLDataType{ 2138 TypeKind: "INT64", 2139 }, 2140 }, 2141 }, 2142 }) 2143 if err != nil { 2144 t.Fatalf("Create: %v", err) 2145 } 2146} 2147 2148func TestIntegration_RoutineComplexTypes(t *testing.T) { 2149 if client == nil { 2150 t.Skip("Integration tests skipped") 2151 } 2152 ctx := context.Background() 2153 2154 routineID := routineIDs.New() 2155 routine := dataset.Routine(routineID) 2156 sql := fmt.Sprintf(` 2157 CREATE FUNCTION `+"`%s`("+` 2158 arr ARRAY<STRUCT<name STRING, val INT64>> 2159 ) AS ( 2160 (SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem) 2161 )`, 2162 routine.FullyQualifiedName()) 2163 if err := runQueryJob(ctx, sql); err != nil { 2164 t.Fatal(err) 2165 } 2166 defer routine.Delete(ctx) 2167 2168 meta, err := routine.Metadata(ctx) 2169 if err != nil { 2170 t.Fatalf("Metadata: %v", err) 2171 } 2172 if meta.Type != "SCALAR_FUNCTION" { 2173 t.Fatalf("routine type mismatch, got %s want SCALAR_FUNCTION", meta.Type) 2174 } 2175 if meta.Language != "SQL" { 2176 t.Fatalf("language type mismatch, got %s want SQL", meta.Language) 2177 } 2178 want := []*RoutineArgument{ 2179 { 2180 Name: "arr", 2181 DataType: &StandardSQLDataType{ 2182 TypeKind: "ARRAY", 2183 ArrayElementType: &StandardSQLDataType{ 2184 TypeKind: "STRUCT", 2185 StructType: &StandardSQLStructType{ 2186 Fields: []*StandardSQLField{ 2187 { 2188 Name: "name", 2189 Type: &StandardSQLDataType{ 2190 TypeKind: "STRING", 2191 }, 2192 }, 2193 { 2194 Name: "val", 2195 Type: &StandardSQLDataType{ 2196 TypeKind: "INT64", 2197 }, 2198 }, 2199 }, 2200 }, 2201 }, 2202 }, 2203 }, 2204 } 2205 if diff := testutil.Diff(meta.Arguments, want); diff != "" { 2206 t.Fatalf("%+v: -got, +want:\n%s", meta.Arguments, diff) 2207 } 2208} 2209 2210func TestIntegration_RoutineLifecycle(t *testing.T) { 2211 if client == nil { 2212 t.Skip("Integration tests skipped") 2213 } 2214 ctx := context.Background() 2215 2216 // Create a scalar UDF routine via a CREATE FUNCTION query 2217 routineID := routineIDs.New() 2218 routine := dataset.Routine(routineID) 2219 2220 sql := fmt.Sprintf(` 2221 CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`, 2222 routine.FullyQualifiedName()) 2223 if err := runQueryJob(ctx, sql); err != nil { 2224 t.Fatal(err) 2225 } 2226 defer routine.Delete(ctx) 2227 2228 // Get the routine metadata. 2229 curMeta, err := routine.Metadata(ctx) 2230 if err != nil { 2231 t.Fatalf("couldn't get metadata: %v", err) 2232 } 2233 2234 want := "SCALAR_FUNCTION" 2235 if curMeta.Type != want { 2236 t.Errorf("Routine type mismatch. got %s want %s", curMeta.Type, want) 2237 } 2238 2239 want = "SQL" 2240 if curMeta.Language != want { 2241 t.Errorf("Language mismatch. got %s want %s", curMeta.Language, want) 2242 } 2243 2244 // Perform an update to change the routine body. 2245 want = "x * 4" 2246 // during beta, update doesn't allow partial updates. Provide all fields. 2247 newMeta, err := routine.Update(ctx, &RoutineMetadataToUpdate{ 2248 Body: want, 2249 Arguments: curMeta.Arguments, 2250 ReturnType: curMeta.ReturnType, 2251 Type: curMeta.Type, 2252 }, curMeta.ETag) 2253 if err != nil { 2254 t.Fatalf("Update: %v", err) 2255 } 2256 if newMeta.Body != want { 2257 t.Fatalf("Update failed. want %s got %s", want, newMeta.Body) 2258 } 2259 2260 // Ensure presence when enumerating the model list. 2261 it := dataset.Routines(ctx) 2262 seen := false 2263 for { 2264 r, err := it.Next() 2265 if err == iterator.Done { 2266 break 2267 } 2268 if err != nil { 2269 t.Fatal(err) 2270 } 2271 if r.RoutineID == routineID { 2272 seen = true 2273 } 2274 } 2275 if !seen { 2276 t.Fatal("routine not listed in dataset") 2277 } 2278 2279 // Delete the model. 2280 if err := routine.Delete(ctx); err != nil { 2281 t.Fatalf("failed to delete routine: %v", err) 2282 } 2283} 2284 2285// Creates a new, temporary table with a unique name and the given schema. 2286func newTable(t *testing.T, s Schema) *Table { 2287 table := dataset.Table(tableIDs.New()) 2288 err := table.Create(context.Background(), &TableMetadata{ 2289 Schema: s, 2290 ExpirationTime: testTableExpiration, 2291 }) 2292 if err != nil { 2293 t.Fatal(err) 2294 } 2295 return table 2296} 2297 2298func checkRead(t *testing.T, msg string, it *RowIterator, want [][]Value) { 2299 if msg2, ok := compareRead(it, want, false); !ok { 2300 t.Errorf("%s: %s", msg, msg2) 2301 } 2302} 2303 2304func checkReadAndTotalRows(t *testing.T, msg string, it *RowIterator, want [][]Value) { 2305 if msg2, ok := compareRead(it, want, true); !ok { 2306 t.Errorf("%s: %s", msg, msg2) 2307 } 2308} 2309 2310func compareRead(it *RowIterator, want [][]Value, compareTotalRows bool) (msg string, ok bool) { 2311 got, _, totalRows, err := readAll(it) 2312 if err != nil { 2313 return err.Error(), false 2314 } 2315 if len(got) != len(want) { 2316 return fmt.Sprintf("got %d rows, want %d", len(got), len(want)), false 2317 } 2318 if compareTotalRows && len(got) != int(totalRows) { 2319 return fmt.Sprintf("got %d rows, but totalRows = %d", len(got), totalRows), false 2320 } 2321 sort.Sort(byCol0(got)) 2322 for i, r := range got { 2323 gotRow := []Value(r) 2324 wantRow := want[i] 2325 if !testutil.Equal(gotRow, wantRow) { 2326 return fmt.Sprintf("#%d: got %#v, want %#v", i, gotRow, wantRow), false 2327 } 2328 } 2329 return "", true 2330} 2331 2332func readAll(it *RowIterator) ([][]Value, Schema, uint64, error) { 2333 var ( 2334 rows [][]Value 2335 schema Schema 2336 totalRows uint64 2337 ) 2338 for { 2339 var vals []Value 2340 err := it.Next(&vals) 2341 if err == iterator.Done { 2342 return rows, schema, totalRows, nil 2343 } 2344 if err != nil { 2345 return nil, nil, 0, err 2346 } 2347 rows = append(rows, vals) 2348 schema = it.Schema 2349 totalRows = it.TotalRows 2350 } 2351} 2352 2353type byCol0 [][]Value 2354 2355func (b byCol0) Len() int { return len(b) } 2356func (b byCol0) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 2357func (b byCol0) Less(i, j int) bool { 2358 switch a := b[i][0].(type) { 2359 case string: 2360 return a < b[j][0].(string) 2361 case civil.Date: 2362 return a.Before(b[j][0].(civil.Date)) 2363 default: 2364 panic("unknown type") 2365 } 2366} 2367 2368func hasStatusCode(err error, code int) bool { 2369 if e, ok := err.(*googleapi.Error); ok && e.Code == code { 2370 return true 2371 } 2372 return false 2373} 2374 2375// wait polls the job until it is complete or an error is returned. 2376func wait(ctx context.Context, job *Job) error { 2377 status, err := job.Wait(ctx) 2378 if err != nil { 2379 return err 2380 } 2381 if status.Err() != nil { 2382 return fmt.Errorf("job status error: %#v", status.Err()) 2383 } 2384 if status.Statistics == nil { 2385 return errors.New("nil Statistics") 2386 } 2387 if status.Statistics.EndTime.IsZero() { 2388 return errors.New("EndTime is zero") 2389 } 2390 if status.Statistics.Details == nil { 2391 return errors.New("nil Statistics.Details") 2392 } 2393 return nil 2394} 2395 2396// waitForRow polls the table until it contains a row. 2397// TODO(jba): use internal.Retry. 2398func waitForRow(ctx context.Context, table *Table) error { 2399 for { 2400 it := table.Read(ctx) 2401 var v []Value 2402 err := it.Next(&v) 2403 if err == nil { 2404 return nil 2405 } 2406 if err != iterator.Done { 2407 return err 2408 } 2409 time.Sleep(1 * time.Second) 2410 } 2411} 2412 2413func putError(err error) string { 2414 pme, ok := err.(PutMultiError) 2415 if !ok { 2416 return err.Error() 2417 } 2418 var msgs []string 2419 for _, err := range pme { 2420 msgs = append(msgs, err.Error()) 2421 } 2422 return strings.Join(msgs, "\n") 2423} 2424