1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "encoding/json" 20 "errors" 21 "flag" 22 "fmt" 23 "log" 24 "math/big" 25 "net/http" 26 "os" 27 "sort" 28 "strings" 29 "testing" 30 "time" 31 32 "cloud.google.com/go/civil" 33 "cloud.google.com/go/httpreplay" 34 "cloud.google.com/go/internal" 35 "cloud.google.com/go/internal/pretty" 36 "cloud.google.com/go/internal/testutil" 37 "cloud.google.com/go/internal/uid" 38 "cloud.google.com/go/storage" 39 "github.com/google/go-cmp/cmp" 40 "github.com/google/go-cmp/cmp/cmpopts" 41 gax "github.com/googleapis/gax-go/v2" 42 "google.golang.org/api/googleapi" 43 "google.golang.org/api/iterator" 44 "google.golang.org/api/option" 45) 46 47const replayFilename = "bigquery.replay" 48 49var record = flag.Bool("record", false, "record RPCs") 50 51var ( 52 client *Client 53 storageClient *storage.Client 54 dataset *Dataset 55 schema = Schema{ 56 {Name: "name", Type: StringFieldType}, 57 {Name: "nums", Type: IntegerFieldType, Repeated: true}, 58 {Name: "rec", Type: RecordFieldType, Schema: Schema{ 59 {Name: "bool", Type: BooleanFieldType}, 60 }}, 61 } 62 testTableExpiration time.Time 63 datasetIDs, tableIDs, modelIDs, routineIDs *uid.Space 64) 65 66// Note: integration tests cannot be run in parallel, because TestIntegration_Location 67// modifies the client. 68 69func TestMain(m *testing.M) { 70 cleanup := initIntegrationTest() 71 r := m.Run() 72 cleanup() 73 os.Exit(r) 74} 75 76func getClient(t *testing.T) *Client { 77 if client == nil { 78 t.Skip("Integration tests skipped") 79 } 80 return client 81} 82 83var grpcHeadersChecker = testutil.DefaultHeadersEnforcer() 84 85// If integration tests will be run, create a unique dataset for them. 86// Return a cleanup function. 87func initIntegrationTest() func() { 88 ctx := context.Background() 89 flag.Parse() // needed for testing.Short() 90 projID := testutil.ProjID() 91 switch { 92 case testing.Short() && *record: 93 log.Fatal("cannot combine -short and -record") 94 return func() {} 95 96 case testing.Short() && httpreplay.Supported() && testutil.CanReplay(replayFilename) && projID != "": 97 // go test -short with a replay file will replay the integration tests if the 98 // environment variables are set. 99 log.Printf("replaying from %s", replayFilename) 100 httpreplay.DebugHeaders() 101 replayer, err := httpreplay.NewReplayer(replayFilename) 102 if err != nil { 103 log.Fatal(err) 104 } 105 var t time.Time 106 if err := json.Unmarshal(replayer.Initial(), &t); err != nil { 107 log.Fatal(err) 108 } 109 hc, err := replayer.Client(ctx) // no creds needed 110 if err != nil { 111 log.Fatal(err) 112 } 113 client, err = NewClient(ctx, projID, option.WithHTTPClient(hc)) 114 if err != nil { 115 log.Fatal(err) 116 } 117 storageClient, err = storage.NewClient(ctx, option.WithHTTPClient(hc)) 118 if err != nil { 119 log.Fatal(err) 120 } 121 cleanup := initTestState(client, t) 122 return func() { 123 cleanup() 124 _ = replayer.Close() // No actionable error returned. 125 } 126 127 case testing.Short(): 128 // go test -short without a replay file skips the integration tests. 129 if testutil.CanReplay(replayFilename) && projID != "" { 130 log.Print("replay not supported for Go versions before 1.8") 131 } 132 client = nil 133 storageClient = nil 134 return func() {} 135 136 default: // Run integration tests against a real backend. 137 ts := testutil.TokenSource(ctx, Scope) 138 if ts == nil { 139 log.Println("Integration tests skipped. See CONTRIBUTING.md for details") 140 return func() {} 141 } 142 bqOpts := []option.ClientOption{option.WithTokenSource(ts)} 143 sOpts := []option.ClientOption{option.WithTokenSource(testutil.TokenSource(ctx, storage.ScopeFullControl))} 144 cleanup := func() {} 145 now := time.Now().UTC() 146 if *record { 147 if !httpreplay.Supported() { 148 log.Print("record not supported for Go versions before 1.8") 149 } else { 150 nowBytes, err := json.Marshal(now) 151 if err != nil { 152 log.Fatal(err) 153 } 154 recorder, err := httpreplay.NewRecorder(replayFilename, nowBytes) 155 if err != nil { 156 log.Fatalf("could not record: %v", err) 157 } 158 log.Printf("recording to %s", replayFilename) 159 hc, err := recorder.Client(ctx, bqOpts...) 160 if err != nil { 161 log.Fatal(err) 162 } 163 bqOpts = append(bqOpts, option.WithHTTPClient(hc)) 164 hc, err = recorder.Client(ctx, sOpts...) 165 if err != nil { 166 log.Fatal(err) 167 } 168 sOpts = append(sOpts, option.WithHTTPClient(hc)) 169 cleanup = func() { 170 if err := recorder.Close(); err != nil { 171 log.Printf("saving recording: %v", err) 172 } 173 } 174 } 175 } else { 176 // When we're not recording, do http header checking. 177 // We can't check universally because option.WithHTTPClient is 178 // incompatible with gRPC options. 179 bqOpts = append(bqOpts, grpcHeadersChecker.CallOptions()...) 180 sOpts = append(sOpts, grpcHeadersChecker.CallOptions()...) 181 } 182 var err error 183 client, err = NewClient(ctx, projID, bqOpts...) 184 if err != nil { 185 log.Fatalf("NewClient: %v", err) 186 } 187 storageClient, err = storage.NewClient(ctx, sOpts...) 188 if err != nil { 189 log.Fatalf("storage.NewClient: %v", err) 190 } 191 c := initTestState(client, now) 192 return func() { c(); cleanup() } 193 } 194} 195 196func initTestState(client *Client, t time.Time) func() { 197 // BigQuery does not accept hyphens in dataset or table IDs, so we create IDs 198 // with underscores. 199 ctx := context.Background() 200 opts := &uid.Options{Sep: '_', Time: t} 201 datasetIDs = uid.NewSpace("dataset", opts) 202 tableIDs = uid.NewSpace("table", opts) 203 modelIDs = uid.NewSpace("model", opts) 204 routineIDs = uid.NewSpace("routine", opts) 205 testTableExpiration = t.Add(10 * time.Minute).Round(time.Second) 206 // For replayability, seed the random source with t. 207 Seed(t.UnixNano()) 208 209 dataset = client.Dataset(datasetIDs.New()) 210 if err := dataset.Create(ctx, nil); err != nil { 211 log.Fatalf("creating dataset %s: %v", dataset.DatasetID, err) 212 } 213 return func() { 214 if err := dataset.DeleteWithContents(ctx); err != nil { 215 log.Printf("could not delete %s", dataset.DatasetID) 216 } 217 } 218} 219 220func TestIntegration_TableCreate(t *testing.T) { 221 // Check that creating a record field with an empty schema is an error. 222 if client == nil { 223 t.Skip("Integration tests skipped") 224 } 225 table := dataset.Table("t_bad") 226 schema := Schema{ 227 {Name: "rec", Type: RecordFieldType, Schema: Schema{}}, 228 } 229 err := table.Create(context.Background(), &TableMetadata{ 230 Schema: schema, 231 ExpirationTime: testTableExpiration.Add(5 * time.Minute), 232 }) 233 if err == nil { 234 t.Fatal("want error, got nil") 235 } 236 if !hasStatusCode(err, http.StatusBadRequest) { 237 t.Fatalf("want a 400 error, got %v", err) 238 } 239} 240 241func TestIntegration_TableCreateView(t *testing.T) { 242 if client == nil { 243 t.Skip("Integration tests skipped") 244 } 245 ctx := context.Background() 246 table := newTable(t, schema) 247 defer table.Delete(ctx) 248 249 // Test that standard SQL views work. 250 view := dataset.Table("t_view_standardsql") 251 query := fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM `%s.%s.%s`", 252 dataset.ProjectID, dataset.DatasetID, table.TableID) 253 err := view.Create(context.Background(), &TableMetadata{ 254 ViewQuery: query, 255 UseStandardSQL: true, 256 }) 257 if err != nil { 258 t.Fatalf("table.create: Did not expect an error, got: %v", err) 259 } 260 if err := view.Delete(ctx); err != nil { 261 t.Fatal(err) 262 } 263} 264 265func TestIntegration_TableMetadata(t *testing.T) { 266 267 if client == nil { 268 t.Skip("Integration tests skipped") 269 } 270 ctx := context.Background() 271 table := newTable(t, schema) 272 defer table.Delete(ctx) 273 // Check table metadata. 274 md, err := table.Metadata(ctx) 275 if err != nil { 276 t.Fatal(err) 277 } 278 // TODO(jba): check md more thorougly. 279 if got, want := md.FullID, fmt.Sprintf("%s:%s.%s", dataset.ProjectID, dataset.DatasetID, table.TableID); got != want { 280 t.Errorf("metadata.FullID: got %q, want %q", got, want) 281 } 282 if got, want := md.Type, RegularTable; got != want { 283 t.Errorf("metadata.Type: got %v, want %v", got, want) 284 } 285 if got, want := md.ExpirationTime, testTableExpiration; !got.Equal(want) { 286 t.Errorf("metadata.Type: got %v, want %v", got, want) 287 } 288 289 // Check that timePartitioning is nil by default 290 if md.TimePartitioning != nil { 291 t.Errorf("metadata.TimePartitioning: got %v, want %v", md.TimePartitioning, nil) 292 } 293 294 // Create tables that have time partitioning 295 partitionCases := []struct { 296 timePartitioning TimePartitioning 297 wantExpiration time.Duration 298 wantField string 299 wantPruneFilter bool 300 }{ 301 {TimePartitioning{}, time.Duration(0), "", false}, 302 {TimePartitioning{Expiration: time.Second}, time.Second, "", false}, 303 {TimePartitioning{RequirePartitionFilter: true}, time.Duration(0), "", true}, 304 { 305 TimePartitioning{ 306 Expiration: time.Second, 307 Field: "date", 308 RequirePartitionFilter: true, 309 }, time.Second, "date", true}, 310 } 311 312 schema2 := Schema{ 313 {Name: "name", Type: StringFieldType}, 314 {Name: "date", Type: DateFieldType}, 315 } 316 317 clustering := &Clustering{ 318 Fields: []string{"name"}, 319 } 320 321 // Currently, clustering depends on partitioning. Interleave testing of the two features. 322 for i, c := range partitionCases { 323 table := dataset.Table(fmt.Sprintf("t_metadata_partition_nocluster_%v", i)) 324 clusterTable := dataset.Table(fmt.Sprintf("t_metadata_partition_cluster_%v", i)) 325 326 // Create unclustered, partitioned variant and get metadata. 327 err = table.Create(context.Background(), &TableMetadata{ 328 Schema: schema2, 329 TimePartitioning: &c.timePartitioning, 330 ExpirationTime: testTableExpiration, 331 }) 332 if err != nil { 333 t.Fatal(err) 334 } 335 defer table.Delete(ctx) 336 md, err := table.Metadata(ctx) 337 if err != nil { 338 t.Fatal(err) 339 } 340 341 // Created clustered table and get metadata. 342 err = clusterTable.Create(context.Background(), &TableMetadata{ 343 Schema: schema2, 344 TimePartitioning: &c.timePartitioning, 345 ExpirationTime: testTableExpiration, 346 Clustering: clustering, 347 }) 348 if err != nil { 349 t.Fatal(err) 350 } 351 clusterMD, err := clusterTable.Metadata(ctx) 352 if err != nil { 353 t.Fatal(err) 354 } 355 356 for _, v := range []*TableMetadata{md, clusterMD} { 357 got := v.TimePartitioning 358 want := &TimePartitioning{ 359 Expiration: c.wantExpiration, 360 Field: c.wantField, 361 RequirePartitionFilter: c.wantPruneFilter, 362 } 363 if !testutil.Equal(got, want) { 364 t.Errorf("metadata.TimePartitioning: got %v, want %v", got, want) 365 } 366 // Manipulate RequirePartitionFilter at the table level. 367 mdUpdate := TableMetadataToUpdate{ 368 RequirePartitionFilter: !want.RequirePartitionFilter, 369 } 370 371 newmd, err := table.Update(ctx, mdUpdate, "") 372 if err != nil { 373 t.Errorf("failed to invert RequirePartitionFilter on %s: %v", table.FullyQualifiedName(), err) 374 } 375 if newmd.RequirePartitionFilter == want.RequirePartitionFilter { 376 t.Errorf("inverting table-level RequirePartitionFilter on %s failed, want %t got %t", table.FullyQualifiedName(), !want.RequirePartitionFilter, newmd.RequirePartitionFilter) 377 } 378 // Also verify that the clone of RequirePartitionFilter in the TimePartitioning message is consistent. 379 if newmd.RequirePartitionFilter != newmd.TimePartitioning.RequirePartitionFilter { 380 t.Errorf("inconsistent RequirePartitionFilter. Table: %t, TimePartitioning: %t", newmd.RequirePartitionFilter, newmd.TimePartitioning.RequirePartitionFilter) 381 } 382 383 } 384 385 if md.Clustering != nil { 386 t.Errorf("metadata.Clustering was not nil on unclustered table %s", table.TableID) 387 } 388 got := clusterMD.Clustering 389 want := clustering 390 if clusterMD.Clustering != clustering { 391 if !testutil.Equal(got, want) { 392 t.Errorf("metadata.Clustering: got %v, want %v", got, want) 393 } 394 } 395 } 396 397} 398 399func TestIntegration_RangePartitioning(t *testing.T) { 400 if client == nil { 401 t.Skip("Integration tests skipped") 402 } 403 ctx := context.Background() 404 table := dataset.Table(tableIDs.New()) 405 406 schema := Schema{ 407 {Name: "name", Type: StringFieldType}, 408 {Name: "somevalue", Type: IntegerFieldType}, 409 } 410 411 wantedRange := &RangePartitioningRange{ 412 Start: 10, 413 End: 135, 414 Interval: 25, 415 } 416 417 wantedPartitioning := &RangePartitioning{ 418 Field: "somevalue", 419 Range: wantedRange, 420 } 421 422 err := table.Create(context.Background(), &TableMetadata{ 423 Schema: schema, 424 RangePartitioning: wantedPartitioning, 425 }) 426 if err != nil { 427 t.Fatal(err) 428 } 429 defer table.Delete(ctx) 430 md, err := table.Metadata(ctx) 431 if err != nil { 432 t.Fatal(err) 433 } 434 435 if md.RangePartitioning == nil { 436 t.Fatal("expected range partitioning, got nil") 437 } 438 got := md.RangePartitioning.Field 439 if wantedPartitioning.Field != got { 440 t.Errorf("RangePartitioning Field: got %v, want %v", got, wantedPartitioning.Field) 441 } 442 if md.RangePartitioning.Range == nil { 443 t.Fatal("expected a range definition, got nil") 444 } 445 gotInt64 := md.RangePartitioning.Range.Start 446 if gotInt64 != wantedRange.Start { 447 t.Errorf("Range.Start: got %v, wanted %v", gotInt64, wantedRange.Start) 448 } 449 gotInt64 = md.RangePartitioning.Range.End 450 if gotInt64 != wantedRange.End { 451 t.Errorf("Range.End: got %v, wanted %v", gotInt64, wantedRange.End) 452 } 453 gotInt64 = md.RangePartitioning.Range.Interval 454 if gotInt64 != wantedRange.Interval { 455 t.Errorf("Range.Interval: got %v, wanted %v", gotInt64, wantedRange.Interval) 456 } 457} 458func TestIntegration_RemoveTimePartitioning(t *testing.T) { 459 if client == nil { 460 t.Skip("Integration tests skipped") 461 } 462 ctx := context.Background() 463 table := dataset.Table(tableIDs.New()) 464 want := 24 * time.Hour 465 err := table.Create(ctx, &TableMetadata{ 466 ExpirationTime: testTableExpiration, 467 TimePartitioning: &TimePartitioning{ 468 Expiration: want, 469 }, 470 }) 471 if err != nil { 472 t.Fatal(err) 473 } 474 defer table.Delete(ctx) 475 476 md, err := table.Metadata(ctx) 477 if err != nil { 478 t.Fatal(err) 479 } 480 if got := md.TimePartitioning.Expiration; got != want { 481 t.Fatalf("TimeParitioning expiration want = %v, got = %v", want, got) 482 } 483 484 // Remove time partitioning expiration 485 md, err = table.Update(context.Background(), TableMetadataToUpdate{ 486 TimePartitioning: &TimePartitioning{Expiration: 0}, 487 }, md.ETag) 488 if err != nil { 489 t.Fatal(err) 490 } 491 492 want = time.Duration(0) 493 if got := md.TimePartitioning.Expiration; got != want { 494 t.Fatalf("TimeParitioning expiration want = %v, got = %v", want, got) 495 } 496} 497 498func TestIntegration_DatasetCreate(t *testing.T) { 499 if client == nil { 500 t.Skip("Integration tests skipped") 501 } 502 ctx := context.Background() 503 ds := client.Dataset(datasetIDs.New()) 504 wmd := &DatasetMetadata{Name: "name", Location: "EU"} 505 err := ds.Create(ctx, wmd) 506 if err != nil { 507 t.Fatal(err) 508 } 509 gmd, err := ds.Metadata(ctx) 510 if err != nil { 511 t.Fatal(err) 512 } 513 if got, want := gmd.Name, wmd.Name; got != want { 514 t.Errorf("name: got %q, want %q", got, want) 515 } 516 if got, want := gmd.Location, wmd.Location; got != want { 517 t.Errorf("location: got %q, want %q", got, want) 518 } 519 if err := ds.Delete(ctx); err != nil { 520 t.Fatalf("deleting dataset %v: %v", ds, err) 521 } 522} 523 524func TestIntegration_DatasetMetadata(t *testing.T) { 525 if client == nil { 526 t.Skip("Integration tests skipped") 527 } 528 ctx := context.Background() 529 md, err := dataset.Metadata(ctx) 530 if err != nil { 531 t.Fatal(err) 532 } 533 if got, want := md.FullID, fmt.Sprintf("%s:%s", dataset.ProjectID, dataset.DatasetID); got != want { 534 t.Errorf("FullID: got %q, want %q", got, want) 535 } 536 jan2016 := time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC) 537 if md.CreationTime.Before(jan2016) { 538 t.Errorf("CreationTime: got %s, want > 2016-1-1", md.CreationTime) 539 } 540 if md.LastModifiedTime.Before(jan2016) { 541 t.Errorf("LastModifiedTime: got %s, want > 2016-1-1", md.LastModifiedTime) 542 } 543 544 // Verify that we get a NotFound for a nonexistent dataset. 545 _, err = client.Dataset("does_not_exist").Metadata(ctx) 546 if err == nil || !hasStatusCode(err, http.StatusNotFound) { 547 t.Errorf("got %v, want NotFound error", err) 548 } 549} 550 551func TestIntegration_DatasetDelete(t *testing.T) { 552 if client == nil { 553 t.Skip("Integration tests skipped") 554 } 555 ctx := context.Background() 556 ds := client.Dataset(datasetIDs.New()) 557 if err := ds.Create(ctx, nil); err != nil { 558 t.Fatalf("creating dataset %s: %v", ds.DatasetID, err) 559 } 560 if err := ds.Delete(ctx); err != nil { 561 t.Fatalf("deleting dataset %s: %v", ds.DatasetID, err) 562 } 563} 564 565func TestIntegration_DatasetDeleteWithContents(t *testing.T) { 566 if client == nil { 567 t.Skip("Integration tests skipped") 568 } 569 ctx := context.Background() 570 ds := client.Dataset(datasetIDs.New()) 571 if err := ds.Create(ctx, nil); err != nil { 572 t.Fatalf("creating dataset %s: %v", ds.DatasetID, err) 573 } 574 table := ds.Table(tableIDs.New()) 575 if err := table.Create(ctx, nil); err != nil { 576 t.Fatalf("creating table %s in dataset %s: %v", table.TableID, table.DatasetID, err) 577 } 578 // We expect failure here 579 if err := ds.Delete(ctx); err == nil { 580 t.Fatalf("non-recursive delete of dataset %s succeeded unexpectedly.", ds.DatasetID) 581 } 582 if err := ds.DeleteWithContents(ctx); err != nil { 583 t.Fatalf("deleting recursively dataset %s: %v", ds.DatasetID, err) 584 } 585} 586 587func TestIntegration_DatasetUpdateETags(t *testing.T) { 588 if client == nil { 589 t.Skip("Integration tests skipped") 590 } 591 592 check := func(md *DatasetMetadata, wantDesc, wantName string) { 593 if md.Description != wantDesc { 594 t.Errorf("description: got %q, want %q", md.Description, wantDesc) 595 } 596 if md.Name != wantName { 597 t.Errorf("name: got %q, want %q", md.Name, wantName) 598 } 599 } 600 601 ctx := context.Background() 602 md, err := dataset.Metadata(ctx) 603 if err != nil { 604 t.Fatal(err) 605 } 606 if md.ETag == "" { 607 t.Fatal("empty ETag") 608 } 609 // Write without ETag succeeds. 610 desc := md.Description + "d2" 611 name := md.Name + "n2" 612 md2, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: desc, Name: name}, "") 613 if err != nil { 614 t.Fatal(err) 615 } 616 check(md2, desc, name) 617 618 // Write with original ETag fails because of intervening write. 619 _, err = dataset.Update(ctx, DatasetMetadataToUpdate{Description: "d", Name: "n"}, md.ETag) 620 if err == nil { 621 t.Fatal("got nil, want error") 622 } 623 624 // Write with most recent ETag succeeds. 625 md3, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: "", Name: ""}, md2.ETag) 626 if err != nil { 627 t.Fatal(err) 628 } 629 check(md3, "", "") 630} 631 632func TestIntegration_DatasetUpdateDefaultExpiration(t *testing.T) { 633 if client == nil { 634 t.Skip("Integration tests skipped") 635 } 636 ctx := context.Background() 637 _, err := dataset.Metadata(ctx) 638 if err != nil { 639 t.Fatal(err) 640 } 641 // Set the default expiration time. 642 md, err := dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Hour}, "") 643 if err != nil { 644 t.Fatal(err) 645 } 646 if md.DefaultTableExpiration != time.Hour { 647 t.Fatalf("got %s, want 1h", md.DefaultTableExpiration) 648 } 649 // Omitting DefaultTableExpiration doesn't change it. 650 md, err = dataset.Update(ctx, DatasetMetadataToUpdate{Name: "xyz"}, "") 651 if err != nil { 652 t.Fatal(err) 653 } 654 if md.DefaultTableExpiration != time.Hour { 655 t.Fatalf("got %s, want 1h", md.DefaultTableExpiration) 656 } 657 // Setting it to 0 deletes it (which looks like a 0 duration). 658 md, err = dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Duration(0)}, "") 659 if err != nil { 660 t.Fatal(err) 661 } 662 if md.DefaultTableExpiration != 0 { 663 t.Fatalf("got %s, want 0", md.DefaultTableExpiration) 664 } 665} 666 667func TestIntegration_DatasetUpdateAccess(t *testing.T) { 668 if client == nil { 669 t.Skip("Integration tests skipped") 670 } 671 ctx := context.Background() 672 md, err := dataset.Metadata(ctx) 673 if err != nil { 674 t.Fatal(err) 675 } 676 origAccess := append([]*AccessEntry(nil), md.Access...) 677 newEntries := []*AccessEntry{ 678 { 679 Role: ReaderRole, 680 Entity: "Joe@example.com", 681 EntityType: UserEmailEntity, 682 }, 683 { 684 Role: ReaderRole, 685 Entity: "allUsers", 686 EntityType: IAMMemberEntity, 687 }, 688 } 689 690 newAccess := append(md.Access, newEntries...) 691 dm := DatasetMetadataToUpdate{Access: newAccess} 692 md, err = dataset.Update(ctx, dm, md.ETag) 693 if err != nil { 694 t.Fatal(err) 695 } 696 defer func() { 697 _, err := dataset.Update(ctx, DatasetMetadataToUpdate{Access: origAccess}, md.ETag) 698 if err != nil { 699 t.Log("could not restore dataset access list") 700 } 701 }() 702 for _, v := range md.Access { 703 fmt.Printf("md %+v\n", v) 704 } 705 for _, v := range newAccess { 706 fmt.Printf("newAccess %+v\n", v) 707 } 708 if diff := testutil.Diff(md.Access, newAccess, cmpopts.SortSlices(lessAccessEntries)); diff != "" { 709 t.Fatalf("got=-, want=+:\n%s", diff) 710 } 711} 712 713// Comparison function for AccessEntries to enable order insensitive equality checking. 714func lessAccessEntries(x, y *AccessEntry) bool { 715 if x.Entity < y.Entity { 716 return true 717 } 718 if x.Entity > y.Entity { 719 return false 720 } 721 if x.EntityType < y.EntityType { 722 return true 723 } 724 if x.EntityType > y.EntityType { 725 return false 726 } 727 if x.Role < y.Role { 728 return true 729 } 730 if x.Role > y.Role { 731 return false 732 } 733 if x.View == nil { 734 return y.View != nil 735 } 736 return false 737} 738 739func TestIntegration_DatasetUpdateLabels(t *testing.T) { 740 if client == nil { 741 t.Skip("Integration tests skipped") 742 } 743 ctx := context.Background() 744 _, err := dataset.Metadata(ctx) 745 if err != nil { 746 t.Fatal(err) 747 } 748 var dm DatasetMetadataToUpdate 749 dm.SetLabel("label", "value") 750 md, err := dataset.Update(ctx, dm, "") 751 if err != nil { 752 t.Fatal(err) 753 } 754 if got, want := md.Labels["label"], "value"; got != want { 755 t.Errorf("got %q, want %q", got, want) 756 } 757 dm = DatasetMetadataToUpdate{} 758 dm.DeleteLabel("label") 759 md, err = dataset.Update(ctx, dm, "") 760 if err != nil { 761 t.Fatal(err) 762 } 763 if _, ok := md.Labels["label"]; ok { 764 t.Error("label still present after deletion") 765 } 766} 767 768func TestIntegration_TableUpdateLabels(t *testing.T) { 769 if client == nil { 770 t.Skip("Integration tests skipped") 771 } 772 ctx := context.Background() 773 table := newTable(t, schema) 774 defer table.Delete(ctx) 775 776 var tm TableMetadataToUpdate 777 tm.SetLabel("label", "value") 778 md, err := table.Update(ctx, tm, "") 779 if err != nil { 780 t.Fatal(err) 781 } 782 if got, want := md.Labels["label"], "value"; got != want { 783 t.Errorf("got %q, want %q", got, want) 784 } 785 tm = TableMetadataToUpdate{} 786 tm.DeleteLabel("label") 787 md, err = table.Update(ctx, tm, "") 788 if err != nil { 789 t.Fatal(err) 790 } 791 if _, ok := md.Labels["label"]; ok { 792 t.Error("label still present after deletion") 793 } 794} 795 796func TestIntegration_Tables(t *testing.T) { 797 if client == nil { 798 t.Skip("Integration tests skipped") 799 } 800 ctx := context.Background() 801 table := newTable(t, schema) 802 defer table.Delete(ctx) 803 wantName := table.FullyQualifiedName() 804 805 // This test is flaky due to eventual consistency. 806 ctx, cancel := context.WithTimeout(ctx, 10*time.Second) 807 defer cancel() 808 err := internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) { 809 // Iterate over tables in the dataset. 810 it := dataset.Tables(ctx) 811 var tableNames []string 812 for { 813 tbl, err := it.Next() 814 if err == iterator.Done { 815 break 816 } 817 if err != nil { 818 return false, err 819 } 820 tableNames = append(tableNames, tbl.FullyQualifiedName()) 821 } 822 // Other tests may be running with this dataset, so there might be more 823 // than just our table in the list. So don't try for an exact match; just 824 // make sure that our table is there somewhere. 825 for _, tn := range tableNames { 826 if tn == wantName { 827 return true, nil 828 } 829 } 830 return false, fmt.Errorf("got %v\nwant %s in the list", tableNames, wantName) 831 }) 832 if err != nil { 833 t.Fatal(err) 834 } 835} 836 837func TestIntegration_SimpleRowResults(t *testing.T) { 838 if client == nil { 839 t.Skip("Integration tests skipped") 840 } 841 ctx := context.Background() 842 843 testCases := []struct { 844 description string 845 query string 846 want [][]Value 847 }{ 848 { 849 description: "literals", 850 query: "select 17 as foo", 851 want: [][]Value{{int64(17)}}, 852 }, 853 { 854 description: "empty results", 855 query: "SELECT * FROM (select 17 as foo) where false", 856 want: [][]Value{}, 857 }, 858 { 859 // Note: currently CTAS returns the rows due to the destination table reference, 860 // but it's not clear that it should. 861 // https://github.com/googleapis/google-cloud-go/issues/1467 for followup. 862 description: "ctas ddl", 863 query: fmt.Sprintf("CREATE TABLE %s.%s AS SELECT 17 as foo", dataset.DatasetID, tableIDs.New()), 864 want: [][]Value{{int64(17)}}, 865 }, 866 } 867 for _, tc := range testCases { 868 curCase := tc 869 t.Run(curCase.description, func(t *testing.T) { 870 t.Parallel() 871 q := client.Query(curCase.query) 872 it, err := q.Read(ctx) 873 if err != nil { 874 t.Fatalf("%s read error: %v", curCase.description, err) 875 } 876 checkReadAndTotalRows(t, curCase.description, it, curCase.want) 877 }) 878 } 879} 880 881func TestIntegration_InsertAndRead(t *testing.T) { 882 if client == nil { 883 t.Skip("Integration tests skipped") 884 } 885 ctx := context.Background() 886 table := newTable(t, schema) 887 defer table.Delete(ctx) 888 889 // Populate the table. 890 ins := table.Inserter() 891 var ( 892 wantRows [][]Value 893 saverRows []*ValuesSaver 894 ) 895 for i, name := range []string{"a", "b", "c"} { 896 row := []Value{name, []Value{int64(i)}, []Value{true}} 897 wantRows = append(wantRows, row) 898 saverRows = append(saverRows, &ValuesSaver{ 899 Schema: schema, 900 InsertID: name, 901 Row: row, 902 }) 903 } 904 if err := ins.Put(ctx, saverRows); err != nil { 905 t.Fatal(putError(err)) 906 } 907 908 // Wait until the data has been uploaded. This can take a few seconds, according 909 // to https://cloud.google.com/bigquery/streaming-data-into-bigquery. 910 if err := waitForRow(ctx, table); err != nil { 911 t.Fatal(err) 912 } 913 // Read the table. 914 checkRead(t, "upload", table.Read(ctx), wantRows) 915 916 // Query the table. 917 q := client.Query(fmt.Sprintf("select name, nums, rec from %s", table.TableID)) 918 q.DefaultProjectID = dataset.ProjectID 919 q.DefaultDatasetID = dataset.DatasetID 920 921 rit, err := q.Read(ctx) 922 if err != nil { 923 t.Fatal(err) 924 } 925 checkRead(t, "query", rit, wantRows) 926 927 // Query the long way. 928 job1, err := q.Run(ctx) 929 if err != nil { 930 t.Fatal(err) 931 } 932 if job1.LastStatus() == nil { 933 t.Error("no LastStatus") 934 } 935 job2, err := client.JobFromID(ctx, job1.ID()) 936 if err != nil { 937 t.Fatal(err) 938 } 939 if job2.LastStatus() == nil { 940 t.Error("no LastStatus") 941 } 942 rit, err = job2.Read(ctx) 943 if err != nil { 944 t.Fatal(err) 945 } 946 checkRead(t, "job.Read", rit, wantRows) 947 948 // Get statistics. 949 jobStatus, err := job2.Status(ctx) 950 if err != nil { 951 t.Fatal(err) 952 } 953 if jobStatus.Statistics == nil { 954 t.Fatal("jobStatus missing statistics") 955 } 956 if _, ok := jobStatus.Statistics.Details.(*QueryStatistics); !ok { 957 t.Errorf("expected QueryStatistics, got %T", jobStatus.Statistics.Details) 958 } 959 960 // Test reading directly into a []Value. 961 valueLists, schema, _, err := readAll(table.Read(ctx)) 962 if err != nil { 963 t.Fatal(err) 964 } 965 it := table.Read(ctx) 966 for i, vl := range valueLists { 967 var got []Value 968 if err := it.Next(&got); err != nil { 969 t.Fatal(err) 970 } 971 if !testutil.Equal(it.Schema, schema) { 972 t.Fatalf("got schema %v, want %v", it.Schema, schema) 973 } 974 want := []Value(vl) 975 if !testutil.Equal(got, want) { 976 t.Errorf("%d: got %v, want %v", i, got, want) 977 } 978 } 979 980 // Test reading into a map. 981 it = table.Read(ctx) 982 for _, vl := range valueLists { 983 var vm map[string]Value 984 if err := it.Next(&vm); err != nil { 985 t.Fatal(err) 986 } 987 if got, want := len(vm), len(vl); got != want { 988 t.Fatalf("valueMap len: got %d, want %d", got, want) 989 } 990 // With maps, structs become nested maps. 991 vl[2] = map[string]Value{"bool": vl[2].([]Value)[0]} 992 for i, v := range vl { 993 if got, want := vm[schema[i].Name], v; !testutil.Equal(got, want) { 994 t.Errorf("%d, name=%s: got %#v, want %#v", 995 i, schema[i].Name, got, want) 996 } 997 } 998 } 999 1000} 1001 1002type SubSubTestStruct struct { 1003 Integer int64 1004} 1005 1006type SubTestStruct struct { 1007 String string 1008 Record SubSubTestStruct 1009 RecordArray []SubSubTestStruct 1010} 1011 1012type TestStruct struct { 1013 Name string 1014 Bytes []byte 1015 Integer int64 1016 Float float64 1017 Boolean bool 1018 Timestamp time.Time 1019 Date civil.Date 1020 Time civil.Time 1021 DateTime civil.DateTime 1022 Numeric *big.Rat 1023 Geography string 1024 1025 StringArray []string 1026 IntegerArray []int64 1027 FloatArray []float64 1028 BooleanArray []bool 1029 TimestampArray []time.Time 1030 DateArray []civil.Date 1031 TimeArray []civil.Time 1032 DateTimeArray []civil.DateTime 1033 NumericArray []*big.Rat 1034 GeographyArray []string 1035 1036 Record SubTestStruct 1037 RecordArray []SubTestStruct 1038} 1039 1040// Round times to the microsecond for comparison purposes. 1041var roundToMicros = cmp.Transformer("RoundToMicros", 1042 func(t time.Time) time.Time { return t.Round(time.Microsecond) }) 1043 1044func TestIntegration_InsertAndReadStructs(t *testing.T) { 1045 if client == nil { 1046 t.Skip("Integration tests skipped") 1047 } 1048 schema, err := InferSchema(TestStruct{}) 1049 if err != nil { 1050 t.Fatal(err) 1051 } 1052 1053 ctx := context.Background() 1054 table := newTable(t, schema) 1055 defer table.Delete(ctx) 1056 1057 d := civil.Date{Year: 2016, Month: 3, Day: 20} 1058 tm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000} 1059 ts := time.Date(2016, 3, 20, 15, 4, 5, 6000, time.UTC) 1060 dtm := civil.DateTime{Date: d, Time: tm} 1061 d2 := civil.Date{Year: 1994, Month: 5, Day: 15} 1062 tm2 := civil.Time{Hour: 1, Minute: 2, Second: 4, Nanosecond: 0} 1063 ts2 := time.Date(1994, 5, 15, 1, 2, 4, 0, time.UTC) 1064 dtm2 := civil.DateTime{Date: d2, Time: tm2} 1065 g := "POINT(-122.350220 47.649154)" 1066 g2 := "POINT(-122.0836791 37.421827)" 1067 1068 // Populate the table. 1069 ins := table.Inserter() 1070 want := []*TestStruct{ 1071 { 1072 "a", 1073 []byte("byte"), 1074 42, 1075 3.14, 1076 true, 1077 ts, 1078 d, 1079 tm, 1080 dtm, 1081 big.NewRat(57, 100), 1082 g, 1083 []string{"a", "b"}, 1084 []int64{1, 2}, 1085 []float64{1, 1.41}, 1086 []bool{true, false}, 1087 []time.Time{ts, ts2}, 1088 []civil.Date{d, d2}, 1089 []civil.Time{tm, tm2}, 1090 []civil.DateTime{dtm, dtm2}, 1091 []*big.Rat{big.NewRat(1, 2), big.NewRat(3, 5)}, 1092 []string{g, g2}, 1093 SubTestStruct{ 1094 "string", 1095 SubSubTestStruct{24}, 1096 []SubSubTestStruct{{1}, {2}}, 1097 }, 1098 []SubTestStruct{ 1099 {String: "empty"}, 1100 { 1101 "full", 1102 SubSubTestStruct{1}, 1103 []SubSubTestStruct{{1}, {2}}, 1104 }, 1105 }, 1106 }, 1107 { 1108 Name: "b", 1109 Bytes: []byte("byte2"), 1110 Integer: 24, 1111 Float: 4.13, 1112 Boolean: false, 1113 Timestamp: ts, 1114 Date: d, 1115 Time: tm, 1116 DateTime: dtm, 1117 Numeric: big.NewRat(4499, 10000), 1118 }, 1119 } 1120 var savers []*StructSaver 1121 for _, s := range want { 1122 savers = append(savers, &StructSaver{Schema: schema, Struct: s}) 1123 } 1124 if err := ins.Put(ctx, savers); err != nil { 1125 t.Fatal(putError(err)) 1126 } 1127 1128 // Wait until the data has been uploaded. This can take a few seconds, according 1129 // to https://cloud.google.com/bigquery/streaming-data-into-bigquery. 1130 if err := waitForRow(ctx, table); err != nil { 1131 t.Fatal(err) 1132 } 1133 1134 // Test iteration with structs. 1135 it := table.Read(ctx) 1136 var got []*TestStruct 1137 for { 1138 var g TestStruct 1139 err := it.Next(&g) 1140 if err == iterator.Done { 1141 break 1142 } 1143 if err != nil { 1144 t.Fatal(err) 1145 } 1146 got = append(got, &g) 1147 } 1148 sort.Sort(byName(got)) 1149 1150 // BigQuery does not elide nils. It reports an error for nil fields. 1151 for i, g := range got { 1152 if i >= len(want) { 1153 t.Errorf("%d: got %v, past end of want", i, pretty.Value(g)) 1154 } else if diff := testutil.Diff(g, want[i], roundToMicros); diff != "" { 1155 t.Errorf("%d: got=-, want=+:\n%s", i, diff) 1156 } 1157 } 1158} 1159 1160type byName []*TestStruct 1161 1162func (b byName) Len() int { return len(b) } 1163func (b byName) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 1164func (b byName) Less(i, j int) bool { return b[i].Name < b[j].Name } 1165 1166func TestIntegration_InsertAndReadNullable(t *testing.T) { 1167 if client == nil { 1168 t.Skip("Integration tests skipped") 1169 } 1170 ctm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000} 1171 cdt := civil.DateTime{Date: testDate, Time: ctm} 1172 rat := big.NewRat(33, 100) 1173 geo := "POINT(-122.198939 47.669865)" 1174 1175 // Nil fields in the struct. 1176 testInsertAndReadNullable(t, testStructNullable{}, make([]Value, len(testStructNullableSchema))) 1177 1178 // Explicitly invalidate the Null* types within the struct. 1179 testInsertAndReadNullable(t, testStructNullable{ 1180 String: NullString{Valid: false}, 1181 Integer: NullInt64{Valid: false}, 1182 Float: NullFloat64{Valid: false}, 1183 Boolean: NullBool{Valid: false}, 1184 Timestamp: NullTimestamp{Valid: false}, 1185 Date: NullDate{Valid: false}, 1186 Time: NullTime{Valid: false}, 1187 DateTime: NullDateTime{Valid: false}, 1188 Geography: NullGeography{Valid: false}, 1189 }, 1190 make([]Value, len(testStructNullableSchema))) 1191 1192 // Populate the struct with values. 1193 testInsertAndReadNullable(t, testStructNullable{ 1194 String: NullString{"x", true}, 1195 Bytes: []byte{1, 2, 3}, 1196 Integer: NullInt64{1, true}, 1197 Float: NullFloat64{2.3, true}, 1198 Boolean: NullBool{true, true}, 1199 Timestamp: NullTimestamp{testTimestamp, true}, 1200 Date: NullDate{testDate, true}, 1201 Time: NullTime{ctm, true}, 1202 DateTime: NullDateTime{cdt, true}, 1203 Numeric: rat, 1204 Geography: NullGeography{geo, true}, 1205 Record: &subNullable{X: NullInt64{4, true}}, 1206 }, 1207 []Value{"x", []byte{1, 2, 3}, int64(1), 2.3, true, testTimestamp, testDate, ctm, cdt, rat, geo, []Value{int64(4)}}) 1208} 1209 1210func testInsertAndReadNullable(t *testing.T, ts testStructNullable, wantRow []Value) { 1211 ctx := context.Background() 1212 table := newTable(t, testStructNullableSchema) 1213 defer table.Delete(ctx) 1214 1215 // Populate the table. 1216 ins := table.Inserter() 1217 if err := ins.Put(ctx, []*StructSaver{{Schema: testStructNullableSchema, Struct: ts}}); err != nil { 1218 t.Fatal(putError(err)) 1219 } 1220 // Wait until the data has been uploaded. This can take a few seconds, according 1221 // to https://cloud.google.com/bigquery/streaming-data-into-bigquery. 1222 if err := waitForRow(ctx, table); err != nil { 1223 t.Fatal(err) 1224 } 1225 1226 // Read into a []Value. 1227 iter := table.Read(ctx) 1228 gotRows, _, _, err := readAll(iter) 1229 if err != nil { 1230 t.Fatal(err) 1231 } 1232 if len(gotRows) != 1 { 1233 t.Fatalf("got %d rows, want 1", len(gotRows)) 1234 } 1235 if diff := testutil.Diff(gotRows[0], wantRow, roundToMicros); diff != "" { 1236 t.Error(diff) 1237 } 1238 1239 // Read into a struct. 1240 want := ts 1241 var sn testStructNullable 1242 it := table.Read(ctx) 1243 if err := it.Next(&sn); err != nil { 1244 t.Fatal(err) 1245 } 1246 if diff := testutil.Diff(sn, want, roundToMicros); diff != "" { 1247 t.Error(diff) 1248 } 1249} 1250 1251func TestIntegration_TableUpdate(t *testing.T) { 1252 if client == nil { 1253 t.Skip("Integration tests skipped") 1254 } 1255 ctx := context.Background() 1256 table := newTable(t, schema) 1257 defer table.Delete(ctx) 1258 1259 // Test Update of non-schema fields. 1260 tm, err := table.Metadata(ctx) 1261 if err != nil { 1262 t.Fatal(err) 1263 } 1264 wantDescription := tm.Description + "more" 1265 wantName := tm.Name + "more" 1266 wantExpiration := tm.ExpirationTime.Add(time.Hour * 24) 1267 got, err := table.Update(ctx, TableMetadataToUpdate{ 1268 Description: wantDescription, 1269 Name: wantName, 1270 ExpirationTime: wantExpiration, 1271 }, tm.ETag) 1272 if err != nil { 1273 t.Fatal(err) 1274 } 1275 if got.Description != wantDescription { 1276 t.Errorf("Description: got %q, want %q", got.Description, wantDescription) 1277 } 1278 if got.Name != wantName { 1279 t.Errorf("Name: got %q, want %q", got.Name, wantName) 1280 } 1281 if got.ExpirationTime != wantExpiration { 1282 t.Errorf("ExpirationTime: got %q, want %q", got.ExpirationTime, wantExpiration) 1283 } 1284 if !testutil.Equal(got.Schema, schema) { 1285 t.Errorf("Schema: got %v, want %v", pretty.Value(got.Schema), pretty.Value(schema)) 1286 } 1287 1288 // Blind write succeeds. 1289 _, err = table.Update(ctx, TableMetadataToUpdate{Name: "x"}, "") 1290 if err != nil { 1291 t.Fatal(err) 1292 } 1293 // Write with old etag fails. 1294 _, err = table.Update(ctx, TableMetadataToUpdate{Name: "y"}, got.ETag) 1295 if err == nil { 1296 t.Fatal("Update with old ETag succeeded, wanted failure") 1297 } 1298 1299 // Test schema update. 1300 // Columns can be added. schema2 is the same as schema, except for the 1301 // added column in the middle. 1302 nested := Schema{ 1303 {Name: "nested", Type: BooleanFieldType}, 1304 {Name: "other", Type: StringFieldType}, 1305 } 1306 schema2 := Schema{ 1307 schema[0], 1308 {Name: "rec2", Type: RecordFieldType, Schema: nested}, 1309 schema[1], 1310 schema[2], 1311 } 1312 1313 got, err = table.Update(ctx, TableMetadataToUpdate{Schema: schema2}, "") 1314 if err != nil { 1315 t.Fatal(err) 1316 } 1317 1318 // Wherever you add the column, it appears at the end. 1319 schema3 := Schema{schema2[0], schema2[2], schema2[3], schema2[1]} 1320 if !testutil.Equal(got.Schema, schema3) { 1321 t.Errorf("add field:\ngot %v\nwant %v", 1322 pretty.Value(got.Schema), pretty.Value(schema3)) 1323 } 1324 1325 // Updating with the empty schema succeeds, but is a no-op. 1326 got, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema{}}, "") 1327 if err != nil { 1328 t.Fatal(err) 1329 } 1330 if !testutil.Equal(got.Schema, schema3) { 1331 t.Errorf("empty schema:\ngot %v\nwant %v", 1332 pretty.Value(got.Schema), pretty.Value(schema3)) 1333 } 1334 1335 // Error cases when updating schema. 1336 for _, test := range []struct { 1337 desc string 1338 fields Schema 1339 }{ 1340 {"change from optional to required", Schema{ 1341 {Name: "name", Type: StringFieldType, Required: true}, 1342 schema3[1], 1343 schema3[2], 1344 schema3[3], 1345 }}, 1346 {"add a required field", Schema{ 1347 schema3[0], schema3[1], schema3[2], schema3[3], 1348 {Name: "req", Type: StringFieldType, Required: true}, 1349 }}, 1350 {"remove a field", Schema{schema3[0], schema3[1], schema3[2]}}, 1351 {"remove a nested field", Schema{ 1352 schema3[0], schema3[1], schema3[2], 1353 {Name: "rec2", Type: RecordFieldType, Schema: Schema{nested[0]}}}}, 1354 {"remove all nested fields", Schema{ 1355 schema3[0], schema3[1], schema3[2], 1356 {Name: "rec2", Type: RecordFieldType, Schema: Schema{}}}}, 1357 } { 1358 _, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema(test.fields)}, "") 1359 if err == nil { 1360 t.Errorf("%s: want error, got nil", test.desc) 1361 } else if !hasStatusCode(err, 400) { 1362 t.Errorf("%s: want 400, got %v", test.desc, err) 1363 } 1364 } 1365} 1366 1367func TestIntegration_Load(t *testing.T) { 1368 if client == nil { 1369 t.Skip("Integration tests skipped") 1370 } 1371 ctx := context.Background() 1372 // CSV data can't be loaded into a repeated field, so we use a different schema. 1373 table := newTable(t, Schema{ 1374 {Name: "name", Type: StringFieldType}, 1375 {Name: "nums", Type: IntegerFieldType}, 1376 }) 1377 defer table.Delete(ctx) 1378 1379 // Load the table from a reader. 1380 r := strings.NewReader("a,0\nb,1\nc,2\n") 1381 wantRows := [][]Value{ 1382 {"a", int64(0)}, 1383 {"b", int64(1)}, 1384 {"c", int64(2)}, 1385 } 1386 rs := NewReaderSource(r) 1387 loader := table.LoaderFrom(rs) 1388 loader.WriteDisposition = WriteTruncate 1389 loader.Labels = map[string]string{"test": "go"} 1390 job, err := loader.Run(ctx) 1391 if err != nil { 1392 t.Fatal(err) 1393 } 1394 if job.LastStatus() == nil { 1395 t.Error("no LastStatus") 1396 } 1397 conf, err := job.Config() 1398 if err != nil { 1399 t.Fatal(err) 1400 } 1401 config, ok := conf.(*LoadConfig) 1402 if !ok { 1403 t.Fatalf("got %T, want LoadConfig", conf) 1404 } 1405 diff := testutil.Diff(config, &loader.LoadConfig, 1406 cmp.AllowUnexported(Table{}), 1407 cmpopts.IgnoreUnexported(Client{}, ReaderSource{}), 1408 // returned schema is at top level, not in the config 1409 cmpopts.IgnoreFields(FileConfig{}, "Schema")) 1410 if diff != "" { 1411 t.Errorf("got=-, want=+:\n%s", diff) 1412 } 1413 if err := wait(ctx, job); err != nil { 1414 t.Fatal(err) 1415 } 1416 checkReadAndTotalRows(t, "reader load", table.Read(ctx), wantRows) 1417 1418} 1419 1420func TestIntegration_DML(t *testing.T) { 1421 if client == nil { 1422 t.Skip("Integration tests skipped") 1423 } 1424 ctx := context.Background() 1425 table := newTable(t, schema) 1426 defer table.Delete(ctx) 1427 1428 sql := fmt.Sprintf(`INSERT %s.%s (name, nums, rec) 1429 VALUES ('a', [0], STRUCT<BOOL>(TRUE)), 1430 ('b', [1], STRUCT<BOOL>(FALSE)), 1431 ('c', [2], STRUCT<BOOL>(TRUE))`, 1432 table.DatasetID, table.TableID) 1433 if err := runQueryJob(ctx, sql); err != nil { 1434 t.Fatal(err) 1435 } 1436 wantRows := [][]Value{ 1437 {"a", []Value{int64(0)}, []Value{true}}, 1438 {"b", []Value{int64(1)}, []Value{false}}, 1439 {"c", []Value{int64(2)}, []Value{true}}, 1440 } 1441 checkRead(t, "DML", table.Read(ctx), wantRows) 1442} 1443 1444// runQueryJob is useful for running queries where no row data is returned (DDL/DML). 1445func runQueryJob(ctx context.Context, sql string) error { 1446 return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) { 1447 job, err := client.Query(sql).Run(ctx) 1448 if err != nil { 1449 if e, ok := err.(*googleapi.Error); ok && e.Code < 500 { 1450 return true, err // fail on 4xx 1451 } 1452 return false, err 1453 } 1454 _, err = job.Wait(ctx) 1455 if err != nil { 1456 if e, ok := err.(*googleapi.Error); ok && e.Code < 500 { 1457 return true, err // fail on 4xx 1458 } 1459 return false, err 1460 } 1461 return true, nil 1462 }) 1463} 1464 1465func TestIntegration_TimeTypes(t *testing.T) { 1466 if client == nil { 1467 t.Skip("Integration tests skipped") 1468 } 1469 ctx := context.Background() 1470 dtSchema := Schema{ 1471 {Name: "d", Type: DateFieldType}, 1472 {Name: "t", Type: TimeFieldType}, 1473 {Name: "dt", Type: DateTimeFieldType}, 1474 {Name: "ts", Type: TimestampFieldType}, 1475 } 1476 table := newTable(t, dtSchema) 1477 defer table.Delete(ctx) 1478 1479 d := civil.Date{Year: 2016, Month: 3, Day: 20} 1480 tm := civil.Time{Hour: 12, Minute: 30, Second: 0, Nanosecond: 6000} 1481 dtm := civil.DateTime{Date: d, Time: tm} 1482 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC) 1483 wantRows := [][]Value{ 1484 {d, tm, dtm, ts}, 1485 } 1486 ins := table.Inserter() 1487 if err := ins.Put(ctx, []*ValuesSaver{ 1488 {Schema: dtSchema, Row: wantRows[0]}, 1489 }); err != nil { 1490 t.Fatal(putError(err)) 1491 } 1492 if err := waitForRow(ctx, table); err != nil { 1493 t.Fatal(err) 1494 } 1495 1496 // SQL wants DATETIMEs with a space between date and time, but the service 1497 // returns them in RFC3339 form, with a "T" between. 1498 query := fmt.Sprintf("INSERT %s.%s (d, t, dt, ts) "+ 1499 "VALUES ('%s', '%s', '%s', '%s')", 1500 table.DatasetID, table.TableID, 1501 d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05")) 1502 if err := runQueryJob(ctx, query); err != nil { 1503 t.Fatal(err) 1504 } 1505 wantRows = append(wantRows, wantRows[0]) 1506 checkRead(t, "TimeTypes", table.Read(ctx), wantRows) 1507} 1508 1509func TestIntegration_StandardQuery(t *testing.T) { 1510 if client == nil { 1511 t.Skip("Integration tests skipped") 1512 } 1513 ctx := context.Background() 1514 1515 d := civil.Date{Year: 2016, Month: 3, Day: 20} 1516 tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 0} 1517 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC) 1518 dtm := ts.Format("2006-01-02 15:04:05") 1519 1520 // Constructs Value slices made up of int64s. 1521 ints := func(args ...int) []Value { 1522 vals := make([]Value, len(args)) 1523 for i, arg := range args { 1524 vals[i] = int64(arg) 1525 } 1526 return vals 1527 } 1528 1529 testCases := []struct { 1530 query string 1531 wantRow []Value 1532 }{ 1533 {"SELECT 1", ints(1)}, 1534 {"SELECT 1.3", []Value{1.3}}, 1535 {"SELECT CAST(1.3 AS NUMERIC)", []Value{big.NewRat(13, 10)}}, 1536 {"SELECT NUMERIC '0.25'", []Value{big.NewRat(1, 4)}}, 1537 {"SELECT TRUE", []Value{true}}, 1538 {"SELECT 'ABC'", []Value{"ABC"}}, 1539 {"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}}, 1540 {fmt.Sprintf("SELECT TIMESTAMP '%s'", dtm), []Value{ts}}, 1541 {fmt.Sprintf("SELECT [TIMESTAMP '%s', TIMESTAMP '%s']", dtm, dtm), []Value{[]Value{ts, ts}}}, 1542 {fmt.Sprintf("SELECT ('hello', TIMESTAMP '%s')", dtm), []Value{[]Value{"hello", ts}}}, 1543 {fmt.Sprintf("SELECT DATETIME(TIMESTAMP '%s')", dtm), []Value{civil.DateTime{Date: d, Time: tm}}}, 1544 {fmt.Sprintf("SELECT DATE(TIMESTAMP '%s')", dtm), []Value{d}}, 1545 {fmt.Sprintf("SELECT TIME(TIMESTAMP '%s')", dtm), []Value{tm}}, 1546 {"SELECT (1, 2)", []Value{ints(1, 2)}}, 1547 {"SELECT [1, 2, 3]", []Value{ints(1, 2, 3)}}, 1548 {"SELECT ([1, 2], 3, [4, 5])", []Value{[]Value{ints(1, 2), int64(3), ints(4, 5)}}}, 1549 {"SELECT [(1, 2, 3), (4, 5, 6)]", []Value{[]Value{ints(1, 2, 3), ints(4, 5, 6)}}}, 1550 {"SELECT [([1, 2, 3], 4), ([5, 6], 7)]", []Value{[]Value{[]Value{ints(1, 2, 3), int64(4)}, []Value{ints(5, 6), int64(7)}}}}, 1551 {"SELECT ARRAY(SELECT STRUCT([1, 2]))", []Value{[]Value{[]Value{ints(1, 2)}}}}, 1552 } 1553 for _, c := range testCases { 1554 q := client.Query(c.query) 1555 it, err := q.Read(ctx) 1556 if err != nil { 1557 t.Fatal(err) 1558 } 1559 checkRead(t, "StandardQuery", it, [][]Value{c.wantRow}) 1560 } 1561} 1562 1563func TestIntegration_LegacyQuery(t *testing.T) { 1564 if client == nil { 1565 t.Skip("Integration tests skipped") 1566 } 1567 ctx := context.Background() 1568 1569 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC) 1570 dtm := ts.Format("2006-01-02 15:04:05") 1571 1572 testCases := []struct { 1573 query string 1574 wantRow []Value 1575 }{ 1576 {"SELECT 1", []Value{int64(1)}}, 1577 {"SELECT 1.3", []Value{1.3}}, 1578 {"SELECT TRUE", []Value{true}}, 1579 {"SELECT 'ABC'", []Value{"ABC"}}, 1580 {"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}}, 1581 {fmt.Sprintf("SELECT TIMESTAMP('%s')", dtm), []Value{ts}}, 1582 {fmt.Sprintf("SELECT DATE(TIMESTAMP('%s'))", dtm), []Value{"2016-03-20"}}, 1583 {fmt.Sprintf("SELECT TIME(TIMESTAMP('%s'))", dtm), []Value{"15:04:05"}}, 1584 } 1585 for _, c := range testCases { 1586 q := client.Query(c.query) 1587 q.UseLegacySQL = true 1588 it, err := q.Read(ctx) 1589 if err != nil { 1590 t.Fatal(err) 1591 } 1592 checkRead(t, "LegacyQuery", it, [][]Value{c.wantRow}) 1593 } 1594} 1595 1596func TestIntegration_QueryParameters(t *testing.T) { 1597 if client == nil { 1598 t.Skip("Integration tests skipped") 1599 } 1600 ctx := context.Background() 1601 1602 d := civil.Date{Year: 2016, Month: 3, Day: 20} 1603 tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 3008} 1604 rtm := tm 1605 rtm.Nanosecond = 3000 // round to microseconds 1606 dtm := civil.DateTime{Date: d, Time: tm} 1607 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC) 1608 rat := big.NewRat(13, 10) 1609 1610 type ss struct { 1611 String string 1612 } 1613 1614 type s struct { 1615 Timestamp time.Time 1616 StringArray []string 1617 SubStruct ss 1618 SubStructArray []ss 1619 } 1620 1621 testCases := []struct { 1622 query string 1623 parameters []QueryParameter 1624 wantRow []Value 1625 wantConfig interface{} 1626 }{ 1627 { 1628 "SELECT @val", 1629 []QueryParameter{{"val", 1}}, 1630 []Value{int64(1)}, 1631 int64(1), 1632 }, 1633 { 1634 "SELECT @val", 1635 []QueryParameter{{"val", 1.3}}, 1636 []Value{1.3}, 1637 1.3, 1638 }, 1639 { 1640 "SELECT @val", 1641 []QueryParameter{{"val", rat}}, 1642 []Value{rat}, 1643 rat, 1644 }, 1645 { 1646 "SELECT @val", 1647 []QueryParameter{{"val", true}}, 1648 []Value{true}, 1649 true, 1650 }, 1651 { 1652 "SELECT @val", 1653 []QueryParameter{{"val", "ABC"}}, 1654 []Value{"ABC"}, 1655 "ABC", 1656 }, 1657 { 1658 "SELECT @val", 1659 []QueryParameter{{"val", []byte("foo")}}, 1660 []Value{[]byte("foo")}, 1661 []byte("foo"), 1662 }, 1663 { 1664 "SELECT @val", 1665 []QueryParameter{{"val", ts}}, 1666 []Value{ts}, 1667 ts, 1668 }, 1669 { 1670 "SELECT @val", 1671 []QueryParameter{{"val", []time.Time{ts, ts}}}, 1672 []Value{[]Value{ts, ts}}, 1673 []interface{}{ts, ts}, 1674 }, 1675 { 1676 "SELECT @val", 1677 []QueryParameter{{"val", dtm}}, 1678 []Value{civil.DateTime{Date: d, Time: rtm}}, 1679 civil.DateTime{Date: d, Time: rtm}, 1680 }, 1681 { 1682 "SELECT @val", 1683 []QueryParameter{{"val", d}}, 1684 []Value{d}, 1685 d, 1686 }, 1687 { 1688 "SELECT @val", 1689 []QueryParameter{{"val", tm}}, 1690 []Value{rtm}, 1691 rtm, 1692 }, 1693 { 1694 "SELECT @val", 1695 []QueryParameter{{"val", s{ts, []string{"a", "b"}, ss{"c"}, []ss{{"d"}, {"e"}}}}}, 1696 []Value{[]Value{ts, []Value{"a", "b"}, []Value{"c"}, []Value{[]Value{"d"}, []Value{"e"}}}}, 1697 map[string]interface{}{ 1698 "Timestamp": ts, 1699 "StringArray": []interface{}{"a", "b"}, 1700 "SubStruct": map[string]interface{}{"String": "c"}, 1701 "SubStructArray": []interface{}{ 1702 map[string]interface{}{"String": "d"}, 1703 map[string]interface{}{"String": "e"}, 1704 }, 1705 }, 1706 }, 1707 { 1708 "SELECT @val.Timestamp, @val.SubStruct.String", 1709 []QueryParameter{{"val", s{Timestamp: ts, SubStruct: ss{"a"}}}}, 1710 []Value{ts, "a"}, 1711 map[string]interface{}{ 1712 "Timestamp": ts, 1713 "SubStruct": map[string]interface{}{"String": "a"}, 1714 "StringArray": nil, 1715 "SubStructArray": nil, 1716 }, 1717 }, 1718 } 1719 for _, c := range testCases { 1720 q := client.Query(c.query) 1721 q.Parameters = c.parameters 1722 job, err := q.Run(ctx) 1723 if err != nil { 1724 t.Fatal(err) 1725 } 1726 if job.LastStatus() == nil { 1727 t.Error("no LastStatus") 1728 } 1729 it, err := job.Read(ctx) 1730 if err != nil { 1731 t.Fatal(err) 1732 } 1733 checkRead(t, "QueryParameters", it, [][]Value{c.wantRow}) 1734 config, err := job.Config() 1735 if err != nil { 1736 t.Fatal(err) 1737 } 1738 got := config.(*QueryConfig).Parameters[0].Value 1739 if !testutil.Equal(got, c.wantConfig) { 1740 t.Errorf("param %[1]v (%[1]T): config:\ngot %[2]v (%[2]T)\nwant %[3]v (%[3]T)", 1741 c.parameters[0].Value, got, c.wantConfig) 1742 } 1743 } 1744} 1745 1746func TestIntegration_QueryDryRun(t *testing.T) { 1747 if client == nil { 1748 t.Skip("Integration tests skipped") 1749 } 1750 ctx := context.Background() 1751 q := client.Query("SELECT word from " + stdName + " LIMIT 10") 1752 q.DryRun = true 1753 job, err := q.Run(ctx) 1754 if err != nil { 1755 t.Fatal(err) 1756 } 1757 1758 s := job.LastStatus() 1759 if s.State != Done { 1760 t.Errorf("state is %v, expected Done", s.State) 1761 } 1762 if s.Statistics == nil { 1763 t.Fatal("no statistics") 1764 } 1765 if s.Statistics.Details.(*QueryStatistics).Schema == nil { 1766 t.Fatal("no schema") 1767 } 1768 if s.Statistics.Details.(*QueryStatistics).TotalBytesProcessedAccuracy == "" { 1769 t.Fatal("no cost accuracy") 1770 } 1771} 1772 1773func TestIntegration_Scripting(t *testing.T) { 1774 if client == nil { 1775 t.Skip("Integration tests skipped") 1776 } 1777 ctx := context.Background() 1778 sql := ` 1779 -- Declare a variable to hold names as an array. 1780 DECLARE top_names ARRAY<STRING>; 1781 -- Build an array of the top 100 names from the year 2017. 1782 SET top_names = ( 1783 SELECT ARRAY_AGG(name ORDER BY number DESC LIMIT 100) 1784 FROM ` + "`bigquery-public-data`" + `.usa_names.usa_1910_current 1785 WHERE year = 2017 1786 ); 1787 -- Which names appear as words in Shakespeare's plays? 1788 SELECT 1789 name AS shakespeare_name 1790 FROM UNNEST(top_names) AS name 1791 WHERE name IN ( 1792 SELECT word 1793 FROM ` + "`bigquery-public-data`" + `.samples.shakespeare 1794 ); 1795 ` 1796 q := client.Query(sql) 1797 job, err := q.Run(ctx) 1798 if err != nil { 1799 t.Fatalf("failed to run parent job: %v", err) 1800 } 1801 status, err := job.Wait(ctx) 1802 if err != nil { 1803 t.Fatalf("failed to wait for completion: %v", err) 1804 } 1805 if status.Err() != nil { 1806 t.Fatalf("job terminated with error: %v", err) 1807 } 1808 1809 queryStats, ok := status.Statistics.Details.(*QueryStatistics) 1810 if !ok { 1811 t.Fatalf("failed to fetch query statistics") 1812 } 1813 1814 want := "SCRIPT" 1815 if queryStats.StatementType != want { 1816 t.Errorf("statement type mismatch. got %s want %s", queryStats.StatementType, want) 1817 } 1818 1819 if status.Statistics.NumChildJobs <= 0 { 1820 t.Errorf("expected script to indicate nonzero child jobs, got %d", status.Statistics.NumChildJobs) 1821 } 1822 1823 // Ensure child jobs are present. 1824 var childJobs []*Job 1825 1826 it := job.Children(ctx) 1827 for { 1828 job, err := it.Next() 1829 if err == iterator.Done { 1830 break 1831 } 1832 if err != nil { 1833 t.Fatal(err) 1834 } 1835 childJobs = append(childJobs, job) 1836 } 1837 if len(childJobs) == 0 { 1838 t.Fatal("Script had no child jobs.") 1839 } 1840 1841 for _, cj := range childJobs { 1842 cStatus := cj.LastStatus() 1843 if cStatus.Statistics.ParentJobID != job.ID() { 1844 t.Errorf("child job %q doesn't indicate parent. got %q, want %q", cj.ID(), cStatus.Statistics.ParentJobID, job.ID()) 1845 } 1846 if cStatus.Statistics.ScriptStatistics == nil { 1847 t.Errorf("child job %q doesn't have script statistics present", cj.ID()) 1848 } 1849 if cStatus.Statistics.ScriptStatistics.EvaluationKind == "" { 1850 t.Errorf("child job %q didn't indicate evaluation kind", cj.ID()) 1851 } 1852 } 1853 1854} 1855 1856func TestIntegration_ExtractExternal(t *testing.T) { 1857 // Create a table, extract it to GCS, then query it externally. 1858 if client == nil { 1859 t.Skip("Integration tests skipped") 1860 } 1861 ctx := context.Background() 1862 schema := Schema{ 1863 {Name: "name", Type: StringFieldType}, 1864 {Name: "num", Type: IntegerFieldType}, 1865 } 1866 table := newTable(t, schema) 1867 defer table.Delete(ctx) 1868 1869 // Insert table data. 1870 sql := fmt.Sprintf(`INSERT %s.%s (name, num) 1871 VALUES ('a', 1), ('b', 2), ('c', 3)`, 1872 table.DatasetID, table.TableID) 1873 if err := runQueryJob(ctx, sql); err != nil { 1874 t.Fatal(err) 1875 } 1876 // Extract to a GCS object as CSV. 1877 bucketName := testutil.ProjID() 1878 objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID) 1879 uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName) 1880 defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx) 1881 gr := NewGCSReference(uri) 1882 gr.DestinationFormat = CSV 1883 e := table.ExtractorTo(gr) 1884 job, err := e.Run(ctx) 1885 if err != nil { 1886 t.Fatal(err) 1887 } 1888 conf, err := job.Config() 1889 if err != nil { 1890 t.Fatal(err) 1891 } 1892 config, ok := conf.(*ExtractConfig) 1893 if !ok { 1894 t.Fatalf("got %T, want ExtractConfig", conf) 1895 } 1896 diff := testutil.Diff(config, &e.ExtractConfig, 1897 cmp.AllowUnexported(Table{}), 1898 cmpopts.IgnoreUnexported(Client{})) 1899 if diff != "" { 1900 t.Errorf("got=-, want=+:\n%s", diff) 1901 } 1902 if err := wait(ctx, job); err != nil { 1903 t.Fatal(err) 1904 } 1905 1906 edc := &ExternalDataConfig{ 1907 SourceFormat: CSV, 1908 SourceURIs: []string{uri}, 1909 Schema: schema, 1910 Options: &CSVOptions{ 1911 SkipLeadingRows: 1, 1912 // This is the default. Since we use edc as an expectation later on, 1913 // let's just be explicit. 1914 FieldDelimiter: ",", 1915 }, 1916 } 1917 // Query that CSV file directly. 1918 q := client.Query("SELECT * FROM csv") 1919 q.TableDefinitions = map[string]ExternalData{"csv": edc} 1920 wantRows := [][]Value{ 1921 {"a", int64(1)}, 1922 {"b", int64(2)}, 1923 {"c", int64(3)}, 1924 } 1925 iter, err := q.Read(ctx) 1926 if err != nil { 1927 t.Fatal(err) 1928 } 1929 checkReadAndTotalRows(t, "external query", iter, wantRows) 1930 1931 // Make a table pointing to the file, and query it. 1932 // BigQuery does not allow a Table.Read on an external table. 1933 table = dataset.Table(tableIDs.New()) 1934 err = table.Create(context.Background(), &TableMetadata{ 1935 Schema: schema, 1936 ExpirationTime: testTableExpiration, 1937 ExternalDataConfig: edc, 1938 }) 1939 if err != nil { 1940 t.Fatal(err) 1941 } 1942 q = client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID)) 1943 iter, err = q.Read(ctx) 1944 if err != nil { 1945 t.Fatal(err) 1946 } 1947 checkReadAndTotalRows(t, "external table", iter, wantRows) 1948 1949 // While we're here, check that the table metadata is correct. 1950 md, err := table.Metadata(ctx) 1951 if err != nil { 1952 t.Fatal(err) 1953 } 1954 // One difference: since BigQuery returns the schema as part of the ordinary 1955 // table metadata, it does not populate ExternalDataConfig.Schema. 1956 md.ExternalDataConfig.Schema = md.Schema 1957 if diff := testutil.Diff(md.ExternalDataConfig, edc); diff != "" { 1958 t.Errorf("got=-, want=+\n%s", diff) 1959 } 1960} 1961 1962func TestIntegration_ReadNullIntoStruct(t *testing.T) { 1963 // Reading a null into a struct field should return an error (not panic). 1964 if client == nil { 1965 t.Skip("Integration tests skipped") 1966 } 1967 ctx := context.Background() 1968 table := newTable(t, schema) 1969 defer table.Delete(ctx) 1970 1971 ins := table.Inserter() 1972 row := &ValuesSaver{ 1973 Schema: schema, 1974 Row: []Value{nil, []Value{}, []Value{nil}}, 1975 } 1976 if err := ins.Put(ctx, []*ValuesSaver{row}); err != nil { 1977 t.Fatal(putError(err)) 1978 } 1979 if err := waitForRow(ctx, table); err != nil { 1980 t.Fatal(err) 1981 } 1982 1983 q := client.Query(fmt.Sprintf("select name from %s", table.TableID)) 1984 q.DefaultProjectID = dataset.ProjectID 1985 q.DefaultDatasetID = dataset.DatasetID 1986 it, err := q.Read(ctx) 1987 if err != nil { 1988 t.Fatal(err) 1989 } 1990 type S struct{ Name string } 1991 var s S 1992 if err := it.Next(&s); err == nil { 1993 t.Fatal("got nil, want error") 1994 } 1995} 1996 1997const ( 1998 stdName = "`bigquery-public-data.samples.shakespeare`" 1999 legacyName = "[bigquery-public-data:samples.shakespeare]" 2000) 2001 2002// These tests exploit the fact that the two SQL versions have different syntaxes for 2003// fully-qualified table names. 2004var useLegacySQLTests = []struct { 2005 t string // name of table 2006 std, legacy bool // use standard/legacy SQL 2007 err bool // do we expect an error? 2008}{ 2009 {t: legacyName, std: false, legacy: true, err: false}, 2010 {t: legacyName, std: true, legacy: false, err: true}, 2011 {t: legacyName, std: false, legacy: false, err: true}, // standard SQL is default 2012 {t: legacyName, std: true, legacy: true, err: true}, 2013 {t: stdName, std: false, legacy: true, err: true}, 2014 {t: stdName, std: true, legacy: false, err: false}, 2015 {t: stdName, std: false, legacy: false, err: false}, // standard SQL is default 2016 {t: stdName, std: true, legacy: true, err: true}, 2017} 2018 2019func TestIntegration_QueryUseLegacySQL(t *testing.T) { 2020 // Test the UseLegacySQL and UseStandardSQL options for queries. 2021 if client == nil { 2022 t.Skip("Integration tests skipped") 2023 } 2024 ctx := context.Background() 2025 for _, test := range useLegacySQLTests { 2026 q := client.Query(fmt.Sprintf("select word from %s limit 1", test.t)) 2027 q.UseStandardSQL = test.std 2028 q.UseLegacySQL = test.legacy 2029 _, err := q.Read(ctx) 2030 gotErr := err != nil 2031 if gotErr && !test.err { 2032 t.Errorf("%+v:\nunexpected error: %v", test, err) 2033 } else if !gotErr && test.err { 2034 t.Errorf("%+v:\nsucceeded, but want error", test) 2035 } 2036 } 2037} 2038 2039func TestIntegration_TableUseLegacySQL(t *testing.T) { 2040 // Test UseLegacySQL and UseStandardSQL for Table.Create. 2041 if client == nil { 2042 t.Skip("Integration tests skipped") 2043 } 2044 ctx := context.Background() 2045 table := newTable(t, schema) 2046 defer table.Delete(ctx) 2047 for i, test := range useLegacySQLTests { 2048 view := dataset.Table(fmt.Sprintf("t_view_%d", i)) 2049 tm := &TableMetadata{ 2050 ViewQuery: fmt.Sprintf("SELECT word from %s", test.t), 2051 UseStandardSQL: test.std, 2052 UseLegacySQL: test.legacy, 2053 } 2054 err := view.Create(ctx, tm) 2055 gotErr := err != nil 2056 if gotErr && !test.err { 2057 t.Errorf("%+v:\nunexpected error: %v", test, err) 2058 } else if !gotErr && test.err { 2059 t.Errorf("%+v:\nsucceeded, but want error", test) 2060 } 2061 _ = view.Delete(ctx) 2062 } 2063} 2064 2065func TestIntegration_ListJobs(t *testing.T) { 2066 // It's difficult to test the list of jobs, because we can't easily 2067 // control what's in it. Also, there are many jobs in the test project, 2068 // and it takes considerable time to list them all. 2069 if client == nil { 2070 t.Skip("Integration tests skipped") 2071 } 2072 ctx := context.Background() 2073 2074 // About all we can do is list a few jobs. 2075 const max = 20 2076 var jobs []*Job 2077 it := client.Jobs(ctx) 2078 for { 2079 job, err := it.Next() 2080 if err == iterator.Done { 2081 break 2082 } 2083 if err != nil { 2084 t.Fatal(err) 2085 } 2086 jobs = append(jobs, job) 2087 if len(jobs) >= max { 2088 break 2089 } 2090 } 2091 // We expect that there is at least one job in the last few months. 2092 if len(jobs) == 0 { 2093 t.Fatal("did not get any jobs") 2094 } 2095} 2096 2097const tokyo = "asia-northeast1" 2098 2099func TestIntegration_Location(t *testing.T) { 2100 if client == nil { 2101 t.Skip("Integration tests skipped") 2102 } 2103 client.Location = "" 2104 testLocation(t, tokyo) 2105 client.Location = tokyo 2106 defer func() { 2107 client.Location = "" 2108 }() 2109 testLocation(t, "") 2110} 2111 2112func testLocation(t *testing.T, loc string) { 2113 ctx := context.Background() 2114 tokyoDataset := client.Dataset("tokyo") 2115 err := tokyoDataset.Create(ctx, &DatasetMetadata{Location: loc}) 2116 if err != nil && !hasStatusCode(err, 409) { // 409 = already exists 2117 t.Fatal(err) 2118 } 2119 md, err := tokyoDataset.Metadata(ctx) 2120 if err != nil { 2121 t.Fatal(err) 2122 } 2123 if md.Location != tokyo { 2124 t.Fatalf("dataset location: got %s, want %s", md.Location, tokyo) 2125 } 2126 table := tokyoDataset.Table(tableIDs.New()) 2127 err = table.Create(context.Background(), &TableMetadata{ 2128 Schema: Schema{ 2129 {Name: "name", Type: StringFieldType}, 2130 {Name: "nums", Type: IntegerFieldType}, 2131 }, 2132 ExpirationTime: testTableExpiration, 2133 }) 2134 if err != nil { 2135 t.Fatal(err) 2136 } 2137 defer table.Delete(ctx) 2138 loader := table.LoaderFrom(NewReaderSource(strings.NewReader("a,0\nb,1\nc,2\n"))) 2139 loader.Location = loc 2140 job, err := loader.Run(ctx) 2141 if err != nil { 2142 t.Fatal("loader.Run", err) 2143 } 2144 if job.Location() != tokyo { 2145 t.Fatalf("job location: got %s, want %s", job.Location(), tokyo) 2146 } 2147 _, err = client.JobFromID(ctx, job.ID()) 2148 if client.Location == "" && err == nil { 2149 t.Error("JobFromID with Tokyo job, no client location: want error, got nil") 2150 } 2151 if client.Location != "" && err != nil { 2152 t.Errorf("JobFromID with Tokyo job, with client location: want nil, got %v", err) 2153 } 2154 _, err = client.JobFromIDLocation(ctx, job.ID(), "US") 2155 if err == nil { 2156 t.Error("JobFromIDLocation with US: want error, got nil") 2157 } 2158 job2, err := client.JobFromIDLocation(ctx, job.ID(), loc) 2159 if loc == tokyo && err != nil { 2160 t.Errorf("loc=tokyo: %v", err) 2161 } 2162 if loc == "" && err == nil { 2163 t.Error("loc empty: got nil, want error") 2164 } 2165 if job2 != nil && (job2.ID() != job.ID() || job2.Location() != tokyo) { 2166 t.Errorf("got id %s loc %s, want id%s loc %s", job2.ID(), job2.Location(), job.ID(), tokyo) 2167 } 2168 if err := wait(ctx, job); err != nil { 2169 t.Fatal(err) 2170 } 2171 // Cancel should succeed even if the job is done. 2172 if err := job.Cancel(ctx); err != nil { 2173 t.Fatal(err) 2174 } 2175 2176 q := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID)) 2177 q.Location = loc 2178 iter, err := q.Read(ctx) 2179 if err != nil { 2180 t.Fatal(err) 2181 } 2182 wantRows := [][]Value{ 2183 {"a", int64(0)}, 2184 {"b", int64(1)}, 2185 {"c", int64(2)}, 2186 } 2187 checkRead(t, "location", iter, wantRows) 2188 2189 table2 := tokyoDataset.Table(tableIDs.New()) 2190 copier := table2.CopierFrom(table) 2191 copier.Location = loc 2192 if _, err := copier.Run(ctx); err != nil { 2193 t.Fatal(err) 2194 } 2195 bucketName := testutil.ProjID() 2196 objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID) 2197 uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName) 2198 defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx) 2199 gr := NewGCSReference(uri) 2200 gr.DestinationFormat = CSV 2201 e := table.ExtractorTo(gr) 2202 e.Location = loc 2203 if _, err := e.Run(ctx); err != nil { 2204 t.Fatal(err) 2205 } 2206} 2207 2208func TestIntegration_NumericErrors(t *testing.T) { 2209 // Verify that the service returns an error for a big.Rat that's too large. 2210 if client == nil { 2211 t.Skip("Integration tests skipped") 2212 } 2213 ctx := context.Background() 2214 schema := Schema{{Name: "n", Type: NumericFieldType}} 2215 table := newTable(t, schema) 2216 defer table.Delete(ctx) 2217 tooBigRat := &big.Rat{} 2218 if _, ok := tooBigRat.SetString("1e40"); !ok { 2219 t.Fatal("big.Rat.SetString failed") 2220 } 2221 ins := table.Inserter() 2222 err := ins.Put(ctx, []*ValuesSaver{{Schema: schema, Row: []Value{tooBigRat}}}) 2223 if err == nil { 2224 t.Fatal("got nil, want error") 2225 } 2226} 2227 2228func TestIntegration_QueryErrors(t *testing.T) { 2229 // Verify that a bad query returns an appropriate error. 2230 if client == nil { 2231 t.Skip("Integration tests skipped") 2232 } 2233 ctx := context.Background() 2234 q := client.Query("blah blah broken") 2235 _, err := q.Read(ctx) 2236 const want = "invalidQuery" 2237 if !strings.Contains(err.Error(), want) { 2238 t.Fatalf("got %q, want substring %q", err, want) 2239 } 2240} 2241 2242func TestIntegration_ModelLifecycle(t *testing.T) { 2243 if client == nil { 2244 t.Skip("Integration tests skipped") 2245 } 2246 ctx := context.Background() 2247 2248 // Create a model via a CREATE MODEL query 2249 modelID := modelIDs.New() 2250 model := dataset.Model(modelID) 2251 modelRef := fmt.Sprintf("%s.%s.%s", dataset.ProjectID, dataset.DatasetID, modelID) 2252 2253 sql := fmt.Sprintf(` 2254 CREATE MODEL `+"`%s`"+` 2255 OPTIONS ( 2256 model_type='linear_reg', 2257 max_iteration=1, 2258 learn_rate=0.4, 2259 learn_rate_strategy='constant' 2260 ) AS ( 2261 SELECT 'a' AS f1, 2.0 AS label 2262 UNION ALL 2263 SELECT 'b' AS f1, 3.8 AS label 2264 )`, modelRef) 2265 if err := runQueryJob(ctx, sql); err != nil { 2266 t.Fatal(err) 2267 } 2268 defer model.Delete(ctx) 2269 2270 // Get the model metadata. 2271 curMeta, err := model.Metadata(ctx) 2272 if err != nil { 2273 t.Fatalf("couldn't get metadata: %v", err) 2274 } 2275 2276 want := "LINEAR_REGRESSION" 2277 if curMeta.Type != want { 2278 t.Errorf("Model type mismatch. Want %s got %s", curMeta.Type, want) 2279 } 2280 2281 // Ensure training metadata is available. 2282 runs := curMeta.RawTrainingRuns() 2283 if runs == nil { 2284 t.Errorf("training runs unpopulated.") 2285 } 2286 labelCols, err := curMeta.RawLabelColumns() 2287 if err != nil { 2288 t.Fatalf("failed to get label cols: %v", err) 2289 } 2290 if labelCols == nil { 2291 t.Errorf("label column information unpopulated.") 2292 } 2293 featureCols, err := curMeta.RawFeatureColumns() 2294 if err != nil { 2295 t.Fatalf("failed to get feature cols: %v", err) 2296 } 2297 if featureCols == nil { 2298 t.Errorf("feature column information unpopulated.") 2299 } 2300 2301 // Update mutable fields via API. 2302 expiry := time.Now().Add(24 * time.Hour).Truncate(time.Millisecond) 2303 2304 upd := ModelMetadataToUpdate{ 2305 Description: "new", 2306 Name: "friendly", 2307 ExpirationTime: expiry, 2308 } 2309 2310 newMeta, err := model.Update(ctx, upd, curMeta.ETag) 2311 if err != nil { 2312 t.Fatalf("failed to update: %v", err) 2313 } 2314 2315 want = "new" 2316 if newMeta.Description != want { 2317 t.Fatalf("Description not updated. got %s want %s", newMeta.Description, want) 2318 } 2319 want = "friendly" 2320 if newMeta.Name != want { 2321 t.Fatalf("Description not updated. got %s want %s", newMeta.Description, want) 2322 } 2323 if newMeta.ExpirationTime != expiry { 2324 t.Fatalf("ExpirationTime not updated. got %v want %v", newMeta.ExpirationTime, expiry) 2325 } 2326 2327 // Ensure presence when enumerating the model list. 2328 it := dataset.Models(ctx) 2329 seen := false 2330 for { 2331 mdl, err := it.Next() 2332 if err == iterator.Done { 2333 break 2334 } 2335 if err != nil { 2336 t.Fatal(err) 2337 } 2338 if mdl.ModelID == modelID { 2339 seen = true 2340 } 2341 } 2342 if !seen { 2343 t.Fatal("model not listed in dataset") 2344 } 2345 2346 // Delete the model. 2347 if err := model.Delete(ctx); err != nil { 2348 t.Fatalf("failed to delete model: %v", err) 2349 } 2350} 2351 2352func TestIntegration_RoutineScalarUDF(t *testing.T) { 2353 if client == nil { 2354 t.Skip("Integration tests skipped") 2355 } 2356 ctx := context.Background() 2357 2358 // Create a scalar UDF routine via API. 2359 routineID := routineIDs.New() 2360 routine := dataset.Routine(routineID) 2361 err := routine.Create(ctx, &RoutineMetadata{ 2362 Type: "SCALAR_FUNCTION", 2363 Language: "SQL", 2364 Body: "x * 3", 2365 Arguments: []*RoutineArgument{ 2366 { 2367 Name: "x", 2368 DataType: &StandardSQLDataType{ 2369 TypeKind: "INT64", 2370 }, 2371 }, 2372 }, 2373 }) 2374 if err != nil { 2375 t.Fatalf("Create: %v", err) 2376 } 2377} 2378 2379func TestIntegration_RoutineComplexTypes(t *testing.T) { 2380 if client == nil { 2381 t.Skip("Integration tests skipped") 2382 } 2383 ctx := context.Background() 2384 2385 routineID := routineIDs.New() 2386 routine := dataset.Routine(routineID) 2387 sql := fmt.Sprintf(` 2388 CREATE FUNCTION `+"`%s`("+` 2389 arr ARRAY<STRUCT<name STRING, val INT64>> 2390 ) AS ( 2391 (SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem) 2392 )`, 2393 routine.FullyQualifiedName()) 2394 if err := runQueryJob(ctx, sql); err != nil { 2395 t.Fatal(err) 2396 } 2397 defer routine.Delete(ctx) 2398 2399 meta, err := routine.Metadata(ctx) 2400 if err != nil { 2401 t.Fatalf("Metadata: %v", err) 2402 } 2403 if meta.Type != "SCALAR_FUNCTION" { 2404 t.Fatalf("routine type mismatch, got %s want SCALAR_FUNCTION", meta.Type) 2405 } 2406 if meta.Language != "SQL" { 2407 t.Fatalf("language type mismatch, got %s want SQL", meta.Language) 2408 } 2409 want := []*RoutineArgument{ 2410 { 2411 Name: "arr", 2412 DataType: &StandardSQLDataType{ 2413 TypeKind: "ARRAY", 2414 ArrayElementType: &StandardSQLDataType{ 2415 TypeKind: "STRUCT", 2416 StructType: &StandardSQLStructType{ 2417 Fields: []*StandardSQLField{ 2418 { 2419 Name: "name", 2420 Type: &StandardSQLDataType{ 2421 TypeKind: "STRING", 2422 }, 2423 }, 2424 { 2425 Name: "val", 2426 Type: &StandardSQLDataType{ 2427 TypeKind: "INT64", 2428 }, 2429 }, 2430 }, 2431 }, 2432 }, 2433 }, 2434 }, 2435 } 2436 if diff := testutil.Diff(meta.Arguments, want); diff != "" { 2437 t.Fatalf("%+v: -got, +want:\n%s", meta.Arguments, diff) 2438 } 2439} 2440 2441func TestIntegration_RoutineLifecycle(t *testing.T) { 2442 if client == nil { 2443 t.Skip("Integration tests skipped") 2444 } 2445 ctx := context.Background() 2446 2447 // Create a scalar UDF routine via a CREATE FUNCTION query 2448 routineID := routineIDs.New() 2449 routine := dataset.Routine(routineID) 2450 2451 sql := fmt.Sprintf(` 2452 CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`, 2453 routine.FullyQualifiedName()) 2454 if err := runQueryJob(ctx, sql); err != nil { 2455 t.Fatal(err) 2456 } 2457 defer routine.Delete(ctx) 2458 2459 // Get the routine metadata. 2460 curMeta, err := routine.Metadata(ctx) 2461 if err != nil { 2462 t.Fatalf("couldn't get metadata: %v", err) 2463 } 2464 2465 want := "SCALAR_FUNCTION" 2466 if curMeta.Type != want { 2467 t.Errorf("Routine type mismatch. got %s want %s", curMeta.Type, want) 2468 } 2469 2470 want = "SQL" 2471 if curMeta.Language != want { 2472 t.Errorf("Language mismatch. got %s want %s", curMeta.Language, want) 2473 } 2474 2475 // Perform an update to change the routine body and description. 2476 want = "x * 4" 2477 wantDescription := "an updated description" 2478 // during beta, update doesn't allow partial updates. Provide all fields. 2479 newMeta, err := routine.Update(ctx, &RoutineMetadataToUpdate{ 2480 Body: want, 2481 Arguments: curMeta.Arguments, 2482 Description: wantDescription, 2483 ReturnType: curMeta.ReturnType, 2484 Type: curMeta.Type, 2485 }, curMeta.ETag) 2486 if err != nil { 2487 t.Fatalf("Update: %v", err) 2488 } 2489 if newMeta.Body != want { 2490 t.Fatalf("Update body failed. want %s got %s", want, newMeta.Body) 2491 } 2492 if newMeta.Description != wantDescription { 2493 t.Fatalf("Update description failed. want %s got %s", wantDescription, newMeta.Description) 2494 } 2495 2496 // Ensure presence when enumerating the model list. 2497 it := dataset.Routines(ctx) 2498 seen := false 2499 for { 2500 r, err := it.Next() 2501 if err == iterator.Done { 2502 break 2503 } 2504 if err != nil { 2505 t.Fatal(err) 2506 } 2507 if r.RoutineID == routineID { 2508 seen = true 2509 } 2510 } 2511 if !seen { 2512 t.Fatal("routine not listed in dataset") 2513 } 2514 2515 // Delete the model. 2516 if err := routine.Delete(ctx); err != nil { 2517 t.Fatalf("failed to delete routine: %v", err) 2518 } 2519} 2520 2521// Creates a new, temporary table with a unique name and the given schema. 2522func newTable(t *testing.T, s Schema) *Table { 2523 table := dataset.Table(tableIDs.New()) 2524 err := table.Create(context.Background(), &TableMetadata{ 2525 Schema: s, 2526 ExpirationTime: testTableExpiration, 2527 }) 2528 if err != nil { 2529 t.Fatal(err) 2530 } 2531 return table 2532} 2533 2534func checkRead(t *testing.T, msg string, it *RowIterator, want [][]Value) { 2535 if msg2, ok := compareRead(it, want, false); !ok { 2536 t.Errorf("%s: %s", msg, msg2) 2537 } 2538} 2539 2540func checkReadAndTotalRows(t *testing.T, msg string, it *RowIterator, want [][]Value) { 2541 if msg2, ok := compareRead(it, want, true); !ok { 2542 t.Errorf("%s: %s", msg, msg2) 2543 } 2544} 2545 2546func compareRead(it *RowIterator, want [][]Value, compareTotalRows bool) (msg string, ok bool) { 2547 got, _, totalRows, err := readAll(it) 2548 if err != nil { 2549 return err.Error(), false 2550 } 2551 if len(got) != len(want) { 2552 return fmt.Sprintf("got %d rows, want %d", len(got), len(want)), false 2553 } 2554 if compareTotalRows && len(got) != int(totalRows) { 2555 return fmt.Sprintf("got %d rows, but totalRows = %d", len(got), totalRows), false 2556 } 2557 sort.Sort(byCol0(got)) 2558 for i, r := range got { 2559 gotRow := []Value(r) 2560 wantRow := want[i] 2561 if !testutil.Equal(gotRow, wantRow) { 2562 return fmt.Sprintf("#%d: got %#v, want %#v", i, gotRow, wantRow), false 2563 } 2564 } 2565 return "", true 2566} 2567 2568func readAll(it *RowIterator) ([][]Value, Schema, uint64, error) { 2569 var ( 2570 rows [][]Value 2571 schema Schema 2572 totalRows uint64 2573 ) 2574 for { 2575 var vals []Value 2576 err := it.Next(&vals) 2577 if err == iterator.Done { 2578 return rows, schema, totalRows, nil 2579 } 2580 if err != nil { 2581 return nil, nil, 0, err 2582 } 2583 rows = append(rows, vals) 2584 schema = it.Schema 2585 totalRows = it.TotalRows 2586 } 2587} 2588 2589type byCol0 [][]Value 2590 2591func (b byCol0) Len() int { return len(b) } 2592func (b byCol0) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 2593func (b byCol0) Less(i, j int) bool { 2594 switch a := b[i][0].(type) { 2595 case string: 2596 return a < b[j][0].(string) 2597 case civil.Date: 2598 return a.Before(b[j][0].(civil.Date)) 2599 default: 2600 panic("unknown type") 2601 } 2602} 2603 2604func hasStatusCode(err error, code int) bool { 2605 if e, ok := err.(*googleapi.Error); ok && e.Code == code { 2606 return true 2607 } 2608 return false 2609} 2610 2611// wait polls the job until it is complete or an error is returned. 2612func wait(ctx context.Context, job *Job) error { 2613 status, err := job.Wait(ctx) 2614 if err != nil { 2615 return err 2616 } 2617 if status.Err() != nil { 2618 return fmt.Errorf("job status error: %#v", status.Err()) 2619 } 2620 if status.Statistics == nil { 2621 return errors.New("nil Statistics") 2622 } 2623 if status.Statistics.EndTime.IsZero() { 2624 return errors.New("EndTime is zero") 2625 } 2626 if status.Statistics.Details == nil { 2627 return errors.New("nil Statistics.Details") 2628 } 2629 return nil 2630} 2631 2632// waitForRow polls the table until it contains a row. 2633// TODO(jba): use internal.Retry. 2634func waitForRow(ctx context.Context, table *Table) error { 2635 for { 2636 it := table.Read(ctx) 2637 var v []Value 2638 err := it.Next(&v) 2639 if err == nil { 2640 return nil 2641 } 2642 if err != iterator.Done { 2643 return err 2644 } 2645 time.Sleep(1 * time.Second) 2646 } 2647} 2648 2649func putError(err error) string { 2650 pme, ok := err.(PutMultiError) 2651 if !ok { 2652 return err.Error() 2653 } 2654 var msgs []string 2655 for _, err := range pme { 2656 msgs = append(msgs, err.Error()) 2657 } 2658 return strings.Join(msgs, "\n") 2659} 2660