1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"encoding/json"
20	"errors"
21	"flag"
22	"fmt"
23	"log"
24	"math/big"
25	"net/http"
26	"os"
27	"sort"
28	"strings"
29	"testing"
30	"time"
31
32	"cloud.google.com/go/civil"
33	"cloud.google.com/go/httpreplay"
34	"cloud.google.com/go/internal"
35	"cloud.google.com/go/internal/pretty"
36	"cloud.google.com/go/internal/testutil"
37	"cloud.google.com/go/internal/uid"
38	"cloud.google.com/go/storage"
39	"github.com/google/go-cmp/cmp"
40	"github.com/google/go-cmp/cmp/cmpopts"
41	gax "github.com/googleapis/gax-go/v2"
42	"google.golang.org/api/googleapi"
43	"google.golang.org/api/iterator"
44	"google.golang.org/api/option"
45)
46
47const replayFilename = "bigquery.replay"
48
49var record = flag.Bool("record", false, "record RPCs")
50
51var (
52	client        *Client
53	storageClient *storage.Client
54	dataset       *Dataset
55	schema        = Schema{
56		{Name: "name", Type: StringFieldType},
57		{Name: "nums", Type: IntegerFieldType, Repeated: true},
58		{Name: "rec", Type: RecordFieldType, Schema: Schema{
59			{Name: "bool", Type: BooleanFieldType},
60		}},
61	}
62	testTableExpiration                        time.Time
63	datasetIDs, tableIDs, modelIDs, routineIDs *uid.Space
64)
65
66// Note: integration tests cannot be run in parallel, because TestIntegration_Location
67// modifies the client.
68
69func TestMain(m *testing.M) {
70	cleanup := initIntegrationTest()
71	r := m.Run()
72	cleanup()
73	os.Exit(r)
74}
75
76func getClient(t *testing.T) *Client {
77	if client == nil {
78		t.Skip("Integration tests skipped")
79	}
80	return client
81}
82
83var grpcHeadersChecker = testutil.DefaultHeadersEnforcer()
84
85// If integration tests will be run, create a unique dataset for them.
86// Return a cleanup function.
87func initIntegrationTest() func() {
88	ctx := context.Background()
89	flag.Parse() // needed for testing.Short()
90	projID := testutil.ProjID()
91	switch {
92	case testing.Short() && *record:
93		log.Fatal("cannot combine -short and -record")
94		return func() {}
95
96	case testing.Short() && httpreplay.Supported() && testutil.CanReplay(replayFilename) && projID != "":
97		// go test -short with a replay file will replay the integration tests if the
98		// environment variables are set.
99		log.Printf("replaying from %s", replayFilename)
100		httpreplay.DebugHeaders()
101		replayer, err := httpreplay.NewReplayer(replayFilename)
102		if err != nil {
103			log.Fatal(err)
104		}
105		var t time.Time
106		if err := json.Unmarshal(replayer.Initial(), &t); err != nil {
107			log.Fatal(err)
108		}
109		hc, err := replayer.Client(ctx) // no creds needed
110		if err != nil {
111			log.Fatal(err)
112		}
113		client, err = NewClient(ctx, projID, option.WithHTTPClient(hc))
114		if err != nil {
115			log.Fatal(err)
116		}
117		storageClient, err = storage.NewClient(ctx, option.WithHTTPClient(hc))
118		if err != nil {
119			log.Fatal(err)
120		}
121		cleanup := initTestState(client, t)
122		return func() {
123			cleanup()
124			_ = replayer.Close() // No actionable error returned.
125		}
126
127	case testing.Short():
128		// go test -short without a replay file skips the integration tests.
129		if testutil.CanReplay(replayFilename) && projID != "" {
130			log.Print("replay not supported for Go versions before 1.8")
131		}
132		client = nil
133		storageClient = nil
134		return func() {}
135
136	default: // Run integration tests against a real backend.
137		ts := testutil.TokenSource(ctx, Scope)
138		if ts == nil {
139			log.Println("Integration tests skipped. See CONTRIBUTING.md for details")
140			return func() {}
141		}
142		bqOpts := []option.ClientOption{option.WithTokenSource(ts)}
143		sOpts := []option.ClientOption{option.WithTokenSource(testutil.TokenSource(ctx, storage.ScopeFullControl))}
144		cleanup := func() {}
145		now := time.Now().UTC()
146		if *record {
147			if !httpreplay.Supported() {
148				log.Print("record not supported for Go versions before 1.8")
149			} else {
150				nowBytes, err := json.Marshal(now)
151				if err != nil {
152					log.Fatal(err)
153				}
154				recorder, err := httpreplay.NewRecorder(replayFilename, nowBytes)
155				if err != nil {
156					log.Fatalf("could not record: %v", err)
157				}
158				log.Printf("recording to %s", replayFilename)
159				hc, err := recorder.Client(ctx, bqOpts...)
160				if err != nil {
161					log.Fatal(err)
162				}
163				bqOpts = append(bqOpts, option.WithHTTPClient(hc))
164				hc, err = recorder.Client(ctx, sOpts...)
165				if err != nil {
166					log.Fatal(err)
167				}
168				sOpts = append(sOpts, option.WithHTTPClient(hc))
169				cleanup = func() {
170					if err := recorder.Close(); err != nil {
171						log.Printf("saving recording: %v", err)
172					}
173				}
174			}
175		} else {
176			// When we're not recording, do http header checking.
177			// We can't check universally because option.WithHTTPClient is
178			// incompatible with gRPC options.
179			bqOpts = append(bqOpts, grpcHeadersChecker.CallOptions()...)
180			sOpts = append(sOpts, grpcHeadersChecker.CallOptions()...)
181		}
182		var err error
183		client, err = NewClient(ctx, projID, bqOpts...)
184		if err != nil {
185			log.Fatalf("NewClient: %v", err)
186		}
187		storageClient, err = storage.NewClient(ctx, sOpts...)
188		if err != nil {
189			log.Fatalf("storage.NewClient: %v", err)
190		}
191		c := initTestState(client, now)
192		return func() { c(); cleanup() }
193	}
194}
195
196func initTestState(client *Client, t time.Time) func() {
197	// BigQuery does not accept hyphens in dataset or table IDs, so we create IDs
198	// with underscores.
199	ctx := context.Background()
200	opts := &uid.Options{Sep: '_', Time: t}
201	datasetIDs = uid.NewSpace("dataset", opts)
202	tableIDs = uid.NewSpace("table", opts)
203	modelIDs = uid.NewSpace("model", opts)
204	routineIDs = uid.NewSpace("routine", opts)
205	testTableExpiration = t.Add(10 * time.Minute).Round(time.Second)
206	// For replayability, seed the random source with t.
207	Seed(t.UnixNano())
208
209	dataset = client.Dataset(datasetIDs.New())
210	if err := dataset.Create(ctx, nil); err != nil {
211		log.Fatalf("creating dataset %s: %v", dataset.DatasetID, err)
212	}
213	return func() {
214		if err := dataset.DeleteWithContents(ctx); err != nil {
215			log.Printf("could not delete %s", dataset.DatasetID)
216		}
217	}
218}
219
220func TestIntegration_TableCreate(t *testing.T) {
221	// Check that creating a record field with an empty schema is an error.
222	if client == nil {
223		t.Skip("Integration tests skipped")
224	}
225	table := dataset.Table("t_bad")
226	schema := Schema{
227		{Name: "rec", Type: RecordFieldType, Schema: Schema{}},
228	}
229	err := table.Create(context.Background(), &TableMetadata{
230		Schema:         schema,
231		ExpirationTime: testTableExpiration.Add(5 * time.Minute),
232	})
233	if err == nil {
234		t.Fatal("want error, got nil")
235	}
236	if !hasStatusCode(err, http.StatusBadRequest) {
237		t.Fatalf("want a 400 error, got %v", err)
238	}
239}
240
241func TestIntegration_TableCreateView(t *testing.T) {
242	if client == nil {
243		t.Skip("Integration tests skipped")
244	}
245	ctx := context.Background()
246	table := newTable(t, schema)
247	defer table.Delete(ctx)
248
249	// Test that standard SQL views work.
250	view := dataset.Table("t_view_standardsql")
251	query := fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM `%s.%s.%s`",
252		dataset.ProjectID, dataset.DatasetID, table.TableID)
253	err := view.Create(context.Background(), &TableMetadata{
254		ViewQuery:      query,
255		UseStandardSQL: true,
256	})
257	if err != nil {
258		t.Fatalf("table.create: Did not expect an error, got: %v", err)
259	}
260	if err := view.Delete(ctx); err != nil {
261		t.Fatal(err)
262	}
263}
264
265func TestIntegration_TableMetadata(t *testing.T) {
266
267	if client == nil {
268		t.Skip("Integration tests skipped")
269	}
270	ctx := context.Background()
271	table := newTable(t, schema)
272	defer table.Delete(ctx)
273	// Check table metadata.
274	md, err := table.Metadata(ctx)
275	if err != nil {
276		t.Fatal(err)
277	}
278	// TODO(jba): check md more thorougly.
279	if got, want := md.FullID, fmt.Sprintf("%s:%s.%s", dataset.ProjectID, dataset.DatasetID, table.TableID); got != want {
280		t.Errorf("metadata.FullID: got %q, want %q", got, want)
281	}
282	if got, want := md.Type, RegularTable; got != want {
283		t.Errorf("metadata.Type: got %v, want %v", got, want)
284	}
285	if got, want := md.ExpirationTime, testTableExpiration; !got.Equal(want) {
286		t.Errorf("metadata.Type: got %v, want %v", got, want)
287	}
288
289	// Check that timePartitioning is nil by default
290	if md.TimePartitioning != nil {
291		t.Errorf("metadata.TimePartitioning: got %v, want %v", md.TimePartitioning, nil)
292	}
293
294	// Create tables that have time partitioning
295	partitionCases := []struct {
296		timePartitioning TimePartitioning
297		wantExpiration   time.Duration
298		wantField        string
299		wantPruneFilter  bool
300	}{
301		{TimePartitioning{}, time.Duration(0), "", false},
302		{TimePartitioning{Expiration: time.Second}, time.Second, "", false},
303		{TimePartitioning{RequirePartitionFilter: true}, time.Duration(0), "", true},
304		{
305			TimePartitioning{
306				Expiration:             time.Second,
307				Field:                  "date",
308				RequirePartitionFilter: true,
309			}, time.Second, "date", true},
310	}
311
312	schema2 := Schema{
313		{Name: "name", Type: StringFieldType},
314		{Name: "date", Type: DateFieldType},
315	}
316
317	clustering := &Clustering{
318		Fields: []string{"name"},
319	}
320
321	// Currently, clustering depends on partitioning.  Interleave testing of the two features.
322	for i, c := range partitionCases {
323		table := dataset.Table(fmt.Sprintf("t_metadata_partition_nocluster_%v", i))
324		clusterTable := dataset.Table(fmt.Sprintf("t_metadata_partition_cluster_%v", i))
325
326		// Create unclustered, partitioned variant and get metadata.
327		err = table.Create(context.Background(), &TableMetadata{
328			Schema:           schema2,
329			TimePartitioning: &c.timePartitioning,
330			ExpirationTime:   testTableExpiration,
331		})
332		if err != nil {
333			t.Fatal(err)
334		}
335		defer table.Delete(ctx)
336		md, err := table.Metadata(ctx)
337		if err != nil {
338			t.Fatal(err)
339		}
340
341		// Created clustered table and get metadata.
342		err = clusterTable.Create(context.Background(), &TableMetadata{
343			Schema:           schema2,
344			TimePartitioning: &c.timePartitioning,
345			ExpirationTime:   testTableExpiration,
346			Clustering:       clustering,
347		})
348		if err != nil {
349			t.Fatal(err)
350		}
351		clusterMD, err := clusterTable.Metadata(ctx)
352		if err != nil {
353			t.Fatal(err)
354		}
355
356		for _, v := range []*TableMetadata{md, clusterMD} {
357			got := v.TimePartitioning
358			want := &TimePartitioning{
359				Expiration:             c.wantExpiration,
360				Field:                  c.wantField,
361				RequirePartitionFilter: c.wantPruneFilter,
362			}
363			if !testutil.Equal(got, want) {
364				t.Errorf("metadata.TimePartitioning: got %v, want %v", got, want)
365			}
366			// Manipulate RequirePartitionFilter at the table level.
367			mdUpdate := TableMetadataToUpdate{
368				RequirePartitionFilter: !want.RequirePartitionFilter,
369			}
370
371			newmd, err := table.Update(ctx, mdUpdate, "")
372			if err != nil {
373				t.Errorf("failed to invert RequirePartitionFilter on %s: %v", table.FullyQualifiedName(), err)
374			}
375			if newmd.RequirePartitionFilter == want.RequirePartitionFilter {
376				t.Errorf("inverting table-level RequirePartitionFilter on %s failed, want %t got %t", table.FullyQualifiedName(), !want.RequirePartitionFilter, newmd.RequirePartitionFilter)
377			}
378			// Also verify that the clone of RequirePartitionFilter in the TimePartitioning message is consistent.
379			if newmd.RequirePartitionFilter != newmd.TimePartitioning.RequirePartitionFilter {
380				t.Errorf("inconsistent RequirePartitionFilter.  Table: %t, TimePartitioning: %t", newmd.RequirePartitionFilter, newmd.TimePartitioning.RequirePartitionFilter)
381			}
382
383		}
384
385		if md.Clustering != nil {
386			t.Errorf("metadata.Clustering was not nil on unclustered table %s", table.TableID)
387		}
388		got := clusterMD.Clustering
389		want := clustering
390		if clusterMD.Clustering != clustering {
391			if !testutil.Equal(got, want) {
392				t.Errorf("metadata.Clustering: got %v, want %v", got, want)
393			}
394		}
395	}
396
397}
398
399func TestIntegration_RangePartitioning(t *testing.T) {
400	if client == nil {
401		t.Skip("Integration tests skipped")
402	}
403	ctx := context.Background()
404	table := dataset.Table(tableIDs.New())
405
406	schema := Schema{
407		{Name: "name", Type: StringFieldType},
408		{Name: "somevalue", Type: IntegerFieldType},
409	}
410
411	wantedRange := &RangePartitioningRange{
412		Start:    10,
413		End:      135,
414		Interval: 25,
415	}
416
417	wantedPartitioning := &RangePartitioning{
418		Field: "somevalue",
419		Range: wantedRange,
420	}
421
422	err := table.Create(context.Background(), &TableMetadata{
423		Schema:            schema,
424		RangePartitioning: wantedPartitioning,
425	})
426	if err != nil {
427		t.Fatal(err)
428	}
429	defer table.Delete(ctx)
430	md, err := table.Metadata(ctx)
431	if err != nil {
432		t.Fatal(err)
433	}
434
435	if md.RangePartitioning == nil {
436		t.Fatal("expected range partitioning, got nil")
437	}
438	got := md.RangePartitioning.Field
439	if wantedPartitioning.Field != got {
440		t.Errorf("RangePartitioning Field: got %v, want %v", got, wantedPartitioning.Field)
441	}
442	if md.RangePartitioning.Range == nil {
443		t.Fatal("expected a range definition, got nil")
444	}
445	gotInt64 := md.RangePartitioning.Range.Start
446	if gotInt64 != wantedRange.Start {
447		t.Errorf("Range.Start: got %v, wanted %v", gotInt64, wantedRange.Start)
448	}
449	gotInt64 = md.RangePartitioning.Range.End
450	if gotInt64 != wantedRange.End {
451		t.Errorf("Range.End: got %v, wanted %v", gotInt64, wantedRange.End)
452	}
453	gotInt64 = md.RangePartitioning.Range.Interval
454	if gotInt64 != wantedRange.Interval {
455		t.Errorf("Range.Interval: got %v, wanted %v", gotInt64, wantedRange.Interval)
456	}
457}
458func TestIntegration_RemoveTimePartitioning(t *testing.T) {
459	if client == nil {
460		t.Skip("Integration tests skipped")
461	}
462	ctx := context.Background()
463	table := dataset.Table(tableIDs.New())
464	want := 24 * time.Hour
465	err := table.Create(ctx, &TableMetadata{
466		ExpirationTime: testTableExpiration,
467		TimePartitioning: &TimePartitioning{
468			Expiration: want,
469		},
470	})
471	if err != nil {
472		t.Fatal(err)
473	}
474	defer table.Delete(ctx)
475
476	md, err := table.Metadata(ctx)
477	if err != nil {
478		t.Fatal(err)
479	}
480	if got := md.TimePartitioning.Expiration; got != want {
481		t.Fatalf("TimeParitioning expiration want = %v, got = %v", want, got)
482	}
483
484	// Remove time partitioning expiration
485	md, err = table.Update(context.Background(), TableMetadataToUpdate{
486		TimePartitioning: &TimePartitioning{Expiration: 0},
487	}, md.ETag)
488	if err != nil {
489		t.Fatal(err)
490	}
491
492	want = time.Duration(0)
493	if got := md.TimePartitioning.Expiration; got != want {
494		t.Fatalf("TimeParitioning expiration want = %v, got = %v", want, got)
495	}
496}
497
498func TestIntegration_DatasetCreate(t *testing.T) {
499	if client == nil {
500		t.Skip("Integration tests skipped")
501	}
502	ctx := context.Background()
503	ds := client.Dataset(datasetIDs.New())
504	wmd := &DatasetMetadata{Name: "name", Location: "EU"}
505	err := ds.Create(ctx, wmd)
506	if err != nil {
507		t.Fatal(err)
508	}
509	gmd, err := ds.Metadata(ctx)
510	if err != nil {
511		t.Fatal(err)
512	}
513	if got, want := gmd.Name, wmd.Name; got != want {
514		t.Errorf("name: got %q, want %q", got, want)
515	}
516	if got, want := gmd.Location, wmd.Location; got != want {
517		t.Errorf("location: got %q, want %q", got, want)
518	}
519	if err := ds.Delete(ctx); err != nil {
520		t.Fatalf("deleting dataset %v: %v", ds, err)
521	}
522}
523
524func TestIntegration_DatasetMetadata(t *testing.T) {
525	if client == nil {
526		t.Skip("Integration tests skipped")
527	}
528	ctx := context.Background()
529	md, err := dataset.Metadata(ctx)
530	if err != nil {
531		t.Fatal(err)
532	}
533	if got, want := md.FullID, fmt.Sprintf("%s:%s", dataset.ProjectID, dataset.DatasetID); got != want {
534		t.Errorf("FullID: got %q, want %q", got, want)
535	}
536	jan2016 := time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC)
537	if md.CreationTime.Before(jan2016) {
538		t.Errorf("CreationTime: got %s, want > 2016-1-1", md.CreationTime)
539	}
540	if md.LastModifiedTime.Before(jan2016) {
541		t.Errorf("LastModifiedTime: got %s, want > 2016-1-1", md.LastModifiedTime)
542	}
543
544	// Verify that we get a NotFound for a nonexistent dataset.
545	_, err = client.Dataset("does_not_exist").Metadata(ctx)
546	if err == nil || !hasStatusCode(err, http.StatusNotFound) {
547		t.Errorf("got %v, want NotFound error", err)
548	}
549}
550
551func TestIntegration_DatasetDelete(t *testing.T) {
552	if client == nil {
553		t.Skip("Integration tests skipped")
554	}
555	ctx := context.Background()
556	ds := client.Dataset(datasetIDs.New())
557	if err := ds.Create(ctx, nil); err != nil {
558		t.Fatalf("creating dataset %s: %v", ds.DatasetID, err)
559	}
560	if err := ds.Delete(ctx); err != nil {
561		t.Fatalf("deleting dataset %s: %v", ds.DatasetID, err)
562	}
563}
564
565func TestIntegration_DatasetDeleteWithContents(t *testing.T) {
566	if client == nil {
567		t.Skip("Integration tests skipped")
568	}
569	ctx := context.Background()
570	ds := client.Dataset(datasetIDs.New())
571	if err := ds.Create(ctx, nil); err != nil {
572		t.Fatalf("creating dataset %s: %v", ds.DatasetID, err)
573	}
574	table := ds.Table(tableIDs.New())
575	if err := table.Create(ctx, nil); err != nil {
576		t.Fatalf("creating table %s in dataset %s: %v", table.TableID, table.DatasetID, err)
577	}
578	// We expect failure here
579	if err := ds.Delete(ctx); err == nil {
580		t.Fatalf("non-recursive delete of dataset %s succeeded unexpectedly.", ds.DatasetID)
581	}
582	if err := ds.DeleteWithContents(ctx); err != nil {
583		t.Fatalf("deleting recursively dataset %s: %v", ds.DatasetID, err)
584	}
585}
586
587func TestIntegration_DatasetUpdateETags(t *testing.T) {
588	if client == nil {
589		t.Skip("Integration tests skipped")
590	}
591
592	check := func(md *DatasetMetadata, wantDesc, wantName string) {
593		if md.Description != wantDesc {
594			t.Errorf("description: got %q, want %q", md.Description, wantDesc)
595		}
596		if md.Name != wantName {
597			t.Errorf("name: got %q, want %q", md.Name, wantName)
598		}
599	}
600
601	ctx := context.Background()
602	md, err := dataset.Metadata(ctx)
603	if err != nil {
604		t.Fatal(err)
605	}
606	if md.ETag == "" {
607		t.Fatal("empty ETag")
608	}
609	// Write without ETag succeeds.
610	desc := md.Description + "d2"
611	name := md.Name + "n2"
612	md2, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: desc, Name: name}, "")
613	if err != nil {
614		t.Fatal(err)
615	}
616	check(md2, desc, name)
617
618	// Write with original ETag fails because of intervening write.
619	_, err = dataset.Update(ctx, DatasetMetadataToUpdate{Description: "d", Name: "n"}, md.ETag)
620	if err == nil {
621		t.Fatal("got nil, want error")
622	}
623
624	// Write with most recent ETag succeeds.
625	md3, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: "", Name: ""}, md2.ETag)
626	if err != nil {
627		t.Fatal(err)
628	}
629	check(md3, "", "")
630}
631
632func TestIntegration_DatasetUpdateDefaultExpiration(t *testing.T) {
633	if client == nil {
634		t.Skip("Integration tests skipped")
635	}
636	ctx := context.Background()
637	_, err := dataset.Metadata(ctx)
638	if err != nil {
639		t.Fatal(err)
640	}
641	// Set the default expiration time.
642	md, err := dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Hour}, "")
643	if err != nil {
644		t.Fatal(err)
645	}
646	if md.DefaultTableExpiration != time.Hour {
647		t.Fatalf("got %s, want 1h", md.DefaultTableExpiration)
648	}
649	// Omitting DefaultTableExpiration doesn't change it.
650	md, err = dataset.Update(ctx, DatasetMetadataToUpdate{Name: "xyz"}, "")
651	if err != nil {
652		t.Fatal(err)
653	}
654	if md.DefaultTableExpiration != time.Hour {
655		t.Fatalf("got %s, want 1h", md.DefaultTableExpiration)
656	}
657	// Setting it to 0 deletes it (which looks like a 0 duration).
658	md, err = dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Duration(0)}, "")
659	if err != nil {
660		t.Fatal(err)
661	}
662	if md.DefaultTableExpiration != 0 {
663		t.Fatalf("got %s, want 0", md.DefaultTableExpiration)
664	}
665}
666
667func TestIntegration_DatasetUpdateAccess(t *testing.T) {
668	if client == nil {
669		t.Skip("Integration tests skipped")
670	}
671	ctx := context.Background()
672	md, err := dataset.Metadata(ctx)
673	if err != nil {
674		t.Fatal(err)
675	}
676	origAccess := append([]*AccessEntry(nil), md.Access...)
677	newEntries := []*AccessEntry{
678		{
679			Role:       ReaderRole,
680			Entity:     "Joe@example.com",
681			EntityType: UserEmailEntity,
682		},
683		{
684			Role:       ReaderRole,
685			Entity:     "allUsers",
686			EntityType: IAMMemberEntity,
687		},
688	}
689
690	newAccess := append(md.Access, newEntries...)
691	dm := DatasetMetadataToUpdate{Access: newAccess}
692	md, err = dataset.Update(ctx, dm, md.ETag)
693	if err != nil {
694		t.Fatal(err)
695	}
696	defer func() {
697		_, err := dataset.Update(ctx, DatasetMetadataToUpdate{Access: origAccess}, md.ETag)
698		if err != nil {
699			t.Log("could not restore dataset access list")
700		}
701	}()
702	for _, v := range md.Access {
703		fmt.Printf("md %+v\n", v)
704	}
705	for _, v := range newAccess {
706		fmt.Printf("newAccess %+v\n", v)
707	}
708	if diff := testutil.Diff(md.Access, newAccess, cmpopts.SortSlices(lessAccessEntries)); diff != "" {
709		t.Fatalf("got=-, want=+:\n%s", diff)
710	}
711}
712
713// Comparison function for AccessEntries to enable order insensitive equality checking.
714func lessAccessEntries(x, y *AccessEntry) bool {
715	if x.Entity < y.Entity {
716		return true
717	}
718	if x.Entity > y.Entity {
719		return false
720	}
721	if x.EntityType < y.EntityType {
722		return true
723	}
724	if x.EntityType > y.EntityType {
725		return false
726	}
727	if x.Role < y.Role {
728		return true
729	}
730	if x.Role > y.Role {
731		return false
732	}
733	if x.View == nil {
734		return y.View != nil
735	}
736	return false
737}
738
739func TestIntegration_DatasetUpdateLabels(t *testing.T) {
740	if client == nil {
741		t.Skip("Integration tests skipped")
742	}
743	ctx := context.Background()
744	_, err := dataset.Metadata(ctx)
745	if err != nil {
746		t.Fatal(err)
747	}
748	var dm DatasetMetadataToUpdate
749	dm.SetLabel("label", "value")
750	md, err := dataset.Update(ctx, dm, "")
751	if err != nil {
752		t.Fatal(err)
753	}
754	if got, want := md.Labels["label"], "value"; got != want {
755		t.Errorf("got %q, want %q", got, want)
756	}
757	dm = DatasetMetadataToUpdate{}
758	dm.DeleteLabel("label")
759	md, err = dataset.Update(ctx, dm, "")
760	if err != nil {
761		t.Fatal(err)
762	}
763	if _, ok := md.Labels["label"]; ok {
764		t.Error("label still present after deletion")
765	}
766}
767
768func TestIntegration_TableUpdateLabels(t *testing.T) {
769	if client == nil {
770		t.Skip("Integration tests skipped")
771	}
772	ctx := context.Background()
773	table := newTable(t, schema)
774	defer table.Delete(ctx)
775
776	var tm TableMetadataToUpdate
777	tm.SetLabel("label", "value")
778	md, err := table.Update(ctx, tm, "")
779	if err != nil {
780		t.Fatal(err)
781	}
782	if got, want := md.Labels["label"], "value"; got != want {
783		t.Errorf("got %q, want %q", got, want)
784	}
785	tm = TableMetadataToUpdate{}
786	tm.DeleteLabel("label")
787	md, err = table.Update(ctx, tm, "")
788	if err != nil {
789		t.Fatal(err)
790	}
791	if _, ok := md.Labels["label"]; ok {
792		t.Error("label still present after deletion")
793	}
794}
795
796func TestIntegration_Tables(t *testing.T) {
797	if client == nil {
798		t.Skip("Integration tests skipped")
799	}
800	ctx := context.Background()
801	table := newTable(t, schema)
802	defer table.Delete(ctx)
803	wantName := table.FullyQualifiedName()
804
805	// This test is flaky due to eventual consistency.
806	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
807	defer cancel()
808	err := internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
809		// Iterate over tables in the dataset.
810		it := dataset.Tables(ctx)
811		var tableNames []string
812		for {
813			tbl, err := it.Next()
814			if err == iterator.Done {
815				break
816			}
817			if err != nil {
818				return false, err
819			}
820			tableNames = append(tableNames, tbl.FullyQualifiedName())
821		}
822		// Other tests may be running with this dataset, so there might be more
823		// than just our table in the list. So don't try for an exact match; just
824		// make sure that our table is there somewhere.
825		for _, tn := range tableNames {
826			if tn == wantName {
827				return true, nil
828			}
829		}
830		return false, fmt.Errorf("got %v\nwant %s in the list", tableNames, wantName)
831	})
832	if err != nil {
833		t.Fatal(err)
834	}
835}
836
837func TestIntegration_SimpleRowResults(t *testing.T) {
838	if client == nil {
839		t.Skip("Integration tests skipped")
840	}
841	ctx := context.Background()
842
843	testCases := []struct {
844		description string
845		query       string
846		want        [][]Value
847	}{
848		{
849			description: "literals",
850			query:       "select 17 as foo",
851			want:        [][]Value{{int64(17)}},
852		},
853		{
854			description: "empty results",
855			query:       "SELECT * FROM (select 17 as foo) where false",
856			want:        [][]Value{},
857		},
858		{
859			// Note: currently CTAS returns the rows due to the destination table reference,
860			// but it's not clear that it should.
861			// https://github.com/googleapis/google-cloud-go/issues/1467 for followup.
862			description: "ctas ddl",
863			query:       fmt.Sprintf("CREATE TABLE %s.%s AS SELECT 17 as foo", dataset.DatasetID, tableIDs.New()),
864			want:        [][]Value{{int64(17)}},
865		},
866	}
867	for _, tc := range testCases {
868		curCase := tc
869		t.Run(curCase.description, func(t *testing.T) {
870			t.Parallel()
871			q := client.Query(curCase.query)
872			it, err := q.Read(ctx)
873			if err != nil {
874				t.Fatalf("%s read error: %v", curCase.description, err)
875			}
876			checkReadAndTotalRows(t, curCase.description, it, curCase.want)
877		})
878	}
879}
880
881func TestIntegration_InsertAndRead(t *testing.T) {
882	if client == nil {
883		t.Skip("Integration tests skipped")
884	}
885	ctx := context.Background()
886	table := newTable(t, schema)
887	defer table.Delete(ctx)
888
889	// Populate the table.
890	ins := table.Inserter()
891	var (
892		wantRows  [][]Value
893		saverRows []*ValuesSaver
894	)
895	for i, name := range []string{"a", "b", "c"} {
896		row := []Value{name, []Value{int64(i)}, []Value{true}}
897		wantRows = append(wantRows, row)
898		saverRows = append(saverRows, &ValuesSaver{
899			Schema:   schema,
900			InsertID: name,
901			Row:      row,
902		})
903	}
904	if err := ins.Put(ctx, saverRows); err != nil {
905		t.Fatal(putError(err))
906	}
907
908	// Wait until the data has been uploaded. This can take a few seconds, according
909	// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
910	if err := waitForRow(ctx, table); err != nil {
911		t.Fatal(err)
912	}
913	// Read the table.
914	checkRead(t, "upload", table.Read(ctx), wantRows)
915
916	// Query the table.
917	q := client.Query(fmt.Sprintf("select name, nums, rec from %s", table.TableID))
918	q.DefaultProjectID = dataset.ProjectID
919	q.DefaultDatasetID = dataset.DatasetID
920
921	rit, err := q.Read(ctx)
922	if err != nil {
923		t.Fatal(err)
924	}
925	checkRead(t, "query", rit, wantRows)
926
927	// Query the long way.
928	job1, err := q.Run(ctx)
929	if err != nil {
930		t.Fatal(err)
931	}
932	if job1.LastStatus() == nil {
933		t.Error("no LastStatus")
934	}
935	job2, err := client.JobFromID(ctx, job1.ID())
936	if err != nil {
937		t.Fatal(err)
938	}
939	if job2.LastStatus() == nil {
940		t.Error("no LastStatus")
941	}
942	rit, err = job2.Read(ctx)
943	if err != nil {
944		t.Fatal(err)
945	}
946	checkRead(t, "job.Read", rit, wantRows)
947
948	// Get statistics.
949	jobStatus, err := job2.Status(ctx)
950	if err != nil {
951		t.Fatal(err)
952	}
953	if jobStatus.Statistics == nil {
954		t.Fatal("jobStatus missing statistics")
955	}
956	if _, ok := jobStatus.Statistics.Details.(*QueryStatistics); !ok {
957		t.Errorf("expected QueryStatistics, got %T", jobStatus.Statistics.Details)
958	}
959
960	// Test reading directly into a []Value.
961	valueLists, schema, _, err := readAll(table.Read(ctx))
962	if err != nil {
963		t.Fatal(err)
964	}
965	it := table.Read(ctx)
966	for i, vl := range valueLists {
967		var got []Value
968		if err := it.Next(&got); err != nil {
969			t.Fatal(err)
970		}
971		if !testutil.Equal(it.Schema, schema) {
972			t.Fatalf("got schema %v, want %v", it.Schema, schema)
973		}
974		want := []Value(vl)
975		if !testutil.Equal(got, want) {
976			t.Errorf("%d: got %v, want %v", i, got, want)
977		}
978	}
979
980	// Test reading into a map.
981	it = table.Read(ctx)
982	for _, vl := range valueLists {
983		var vm map[string]Value
984		if err := it.Next(&vm); err != nil {
985			t.Fatal(err)
986		}
987		if got, want := len(vm), len(vl); got != want {
988			t.Fatalf("valueMap len: got %d, want %d", got, want)
989		}
990		// With maps, structs become nested maps.
991		vl[2] = map[string]Value{"bool": vl[2].([]Value)[0]}
992		for i, v := range vl {
993			if got, want := vm[schema[i].Name], v; !testutil.Equal(got, want) {
994				t.Errorf("%d, name=%s: got %#v, want %#v",
995					i, schema[i].Name, got, want)
996			}
997		}
998	}
999
1000}
1001
1002type SubSubTestStruct struct {
1003	Integer int64
1004}
1005
1006type SubTestStruct struct {
1007	String      string
1008	Record      SubSubTestStruct
1009	RecordArray []SubSubTestStruct
1010}
1011
1012type TestStruct struct {
1013	Name      string
1014	Bytes     []byte
1015	Integer   int64
1016	Float     float64
1017	Boolean   bool
1018	Timestamp time.Time
1019	Date      civil.Date
1020	Time      civil.Time
1021	DateTime  civil.DateTime
1022	Numeric   *big.Rat
1023	Geography string
1024
1025	StringArray    []string
1026	IntegerArray   []int64
1027	FloatArray     []float64
1028	BooleanArray   []bool
1029	TimestampArray []time.Time
1030	DateArray      []civil.Date
1031	TimeArray      []civil.Time
1032	DateTimeArray  []civil.DateTime
1033	NumericArray   []*big.Rat
1034	GeographyArray []string
1035
1036	Record      SubTestStruct
1037	RecordArray []SubTestStruct
1038}
1039
1040// Round times to the microsecond for comparison purposes.
1041var roundToMicros = cmp.Transformer("RoundToMicros",
1042	func(t time.Time) time.Time { return t.Round(time.Microsecond) })
1043
1044func TestIntegration_InsertAndReadStructs(t *testing.T) {
1045	if client == nil {
1046		t.Skip("Integration tests skipped")
1047	}
1048	schema, err := InferSchema(TestStruct{})
1049	if err != nil {
1050		t.Fatal(err)
1051	}
1052
1053	ctx := context.Background()
1054	table := newTable(t, schema)
1055	defer table.Delete(ctx)
1056
1057	d := civil.Date{Year: 2016, Month: 3, Day: 20}
1058	tm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
1059	ts := time.Date(2016, 3, 20, 15, 4, 5, 6000, time.UTC)
1060	dtm := civil.DateTime{Date: d, Time: tm}
1061	d2 := civil.Date{Year: 1994, Month: 5, Day: 15}
1062	tm2 := civil.Time{Hour: 1, Minute: 2, Second: 4, Nanosecond: 0}
1063	ts2 := time.Date(1994, 5, 15, 1, 2, 4, 0, time.UTC)
1064	dtm2 := civil.DateTime{Date: d2, Time: tm2}
1065	g := "POINT(-122.350220 47.649154)"
1066	g2 := "POINT(-122.0836791 37.421827)"
1067
1068	// Populate the table.
1069	ins := table.Inserter()
1070	want := []*TestStruct{
1071		{
1072			"a",
1073			[]byte("byte"),
1074			42,
1075			3.14,
1076			true,
1077			ts,
1078			d,
1079			tm,
1080			dtm,
1081			big.NewRat(57, 100),
1082			g,
1083			[]string{"a", "b"},
1084			[]int64{1, 2},
1085			[]float64{1, 1.41},
1086			[]bool{true, false},
1087			[]time.Time{ts, ts2},
1088			[]civil.Date{d, d2},
1089			[]civil.Time{tm, tm2},
1090			[]civil.DateTime{dtm, dtm2},
1091			[]*big.Rat{big.NewRat(1, 2), big.NewRat(3, 5)},
1092			[]string{g, g2},
1093			SubTestStruct{
1094				"string",
1095				SubSubTestStruct{24},
1096				[]SubSubTestStruct{{1}, {2}},
1097			},
1098			[]SubTestStruct{
1099				{String: "empty"},
1100				{
1101					"full",
1102					SubSubTestStruct{1},
1103					[]SubSubTestStruct{{1}, {2}},
1104				},
1105			},
1106		},
1107		{
1108			Name:      "b",
1109			Bytes:     []byte("byte2"),
1110			Integer:   24,
1111			Float:     4.13,
1112			Boolean:   false,
1113			Timestamp: ts,
1114			Date:      d,
1115			Time:      tm,
1116			DateTime:  dtm,
1117			Numeric:   big.NewRat(4499, 10000),
1118		},
1119	}
1120	var savers []*StructSaver
1121	for _, s := range want {
1122		savers = append(savers, &StructSaver{Schema: schema, Struct: s})
1123	}
1124	if err := ins.Put(ctx, savers); err != nil {
1125		t.Fatal(putError(err))
1126	}
1127
1128	// Wait until the data has been uploaded. This can take a few seconds, according
1129	// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
1130	if err := waitForRow(ctx, table); err != nil {
1131		t.Fatal(err)
1132	}
1133
1134	// Test iteration with structs.
1135	it := table.Read(ctx)
1136	var got []*TestStruct
1137	for {
1138		var g TestStruct
1139		err := it.Next(&g)
1140		if err == iterator.Done {
1141			break
1142		}
1143		if err != nil {
1144			t.Fatal(err)
1145		}
1146		got = append(got, &g)
1147	}
1148	sort.Sort(byName(got))
1149
1150	// BigQuery does not elide nils. It reports an error for nil fields.
1151	for i, g := range got {
1152		if i >= len(want) {
1153			t.Errorf("%d: got %v, past end of want", i, pretty.Value(g))
1154		} else if diff := testutil.Diff(g, want[i], roundToMicros); diff != "" {
1155			t.Errorf("%d: got=-, want=+:\n%s", i, diff)
1156		}
1157	}
1158}
1159
1160type byName []*TestStruct
1161
1162func (b byName) Len() int           { return len(b) }
1163func (b byName) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
1164func (b byName) Less(i, j int) bool { return b[i].Name < b[j].Name }
1165
1166func TestIntegration_InsertAndReadNullable(t *testing.T) {
1167	if client == nil {
1168		t.Skip("Integration tests skipped")
1169	}
1170	ctm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
1171	cdt := civil.DateTime{Date: testDate, Time: ctm}
1172	rat := big.NewRat(33, 100)
1173	geo := "POINT(-122.198939 47.669865)"
1174
1175	// Nil fields in the struct.
1176	testInsertAndReadNullable(t, testStructNullable{}, make([]Value, len(testStructNullableSchema)))
1177
1178	// Explicitly invalidate the Null* types within the struct.
1179	testInsertAndReadNullable(t, testStructNullable{
1180		String:    NullString{Valid: false},
1181		Integer:   NullInt64{Valid: false},
1182		Float:     NullFloat64{Valid: false},
1183		Boolean:   NullBool{Valid: false},
1184		Timestamp: NullTimestamp{Valid: false},
1185		Date:      NullDate{Valid: false},
1186		Time:      NullTime{Valid: false},
1187		DateTime:  NullDateTime{Valid: false},
1188		Geography: NullGeography{Valid: false},
1189	},
1190		make([]Value, len(testStructNullableSchema)))
1191
1192	// Populate the struct with values.
1193	testInsertAndReadNullable(t, testStructNullable{
1194		String:    NullString{"x", true},
1195		Bytes:     []byte{1, 2, 3},
1196		Integer:   NullInt64{1, true},
1197		Float:     NullFloat64{2.3, true},
1198		Boolean:   NullBool{true, true},
1199		Timestamp: NullTimestamp{testTimestamp, true},
1200		Date:      NullDate{testDate, true},
1201		Time:      NullTime{ctm, true},
1202		DateTime:  NullDateTime{cdt, true},
1203		Numeric:   rat,
1204		Geography: NullGeography{geo, true},
1205		Record:    &subNullable{X: NullInt64{4, true}},
1206	},
1207		[]Value{"x", []byte{1, 2, 3}, int64(1), 2.3, true, testTimestamp, testDate, ctm, cdt, rat, geo, []Value{int64(4)}})
1208}
1209
1210func testInsertAndReadNullable(t *testing.T, ts testStructNullable, wantRow []Value) {
1211	ctx := context.Background()
1212	table := newTable(t, testStructNullableSchema)
1213	defer table.Delete(ctx)
1214
1215	// Populate the table.
1216	ins := table.Inserter()
1217	if err := ins.Put(ctx, []*StructSaver{{Schema: testStructNullableSchema, Struct: ts}}); err != nil {
1218		t.Fatal(putError(err))
1219	}
1220	// Wait until the data has been uploaded. This can take a few seconds, according
1221	// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
1222	if err := waitForRow(ctx, table); err != nil {
1223		t.Fatal(err)
1224	}
1225
1226	// Read into a []Value.
1227	iter := table.Read(ctx)
1228	gotRows, _, _, err := readAll(iter)
1229	if err != nil {
1230		t.Fatal(err)
1231	}
1232	if len(gotRows) != 1 {
1233		t.Fatalf("got %d rows, want 1", len(gotRows))
1234	}
1235	if diff := testutil.Diff(gotRows[0], wantRow, roundToMicros); diff != "" {
1236		t.Error(diff)
1237	}
1238
1239	// Read into a struct.
1240	want := ts
1241	var sn testStructNullable
1242	it := table.Read(ctx)
1243	if err := it.Next(&sn); err != nil {
1244		t.Fatal(err)
1245	}
1246	if diff := testutil.Diff(sn, want, roundToMicros); diff != "" {
1247		t.Error(diff)
1248	}
1249}
1250
1251func TestIntegration_TableUpdate(t *testing.T) {
1252	if client == nil {
1253		t.Skip("Integration tests skipped")
1254	}
1255	ctx := context.Background()
1256	table := newTable(t, schema)
1257	defer table.Delete(ctx)
1258
1259	// Test Update of non-schema fields.
1260	tm, err := table.Metadata(ctx)
1261	if err != nil {
1262		t.Fatal(err)
1263	}
1264	wantDescription := tm.Description + "more"
1265	wantName := tm.Name + "more"
1266	wantExpiration := tm.ExpirationTime.Add(time.Hour * 24)
1267	got, err := table.Update(ctx, TableMetadataToUpdate{
1268		Description:    wantDescription,
1269		Name:           wantName,
1270		ExpirationTime: wantExpiration,
1271	}, tm.ETag)
1272	if err != nil {
1273		t.Fatal(err)
1274	}
1275	if got.Description != wantDescription {
1276		t.Errorf("Description: got %q, want %q", got.Description, wantDescription)
1277	}
1278	if got.Name != wantName {
1279		t.Errorf("Name: got %q, want %q", got.Name, wantName)
1280	}
1281	if got.ExpirationTime != wantExpiration {
1282		t.Errorf("ExpirationTime: got %q, want %q", got.ExpirationTime, wantExpiration)
1283	}
1284	if !testutil.Equal(got.Schema, schema) {
1285		t.Errorf("Schema: got %v, want %v", pretty.Value(got.Schema), pretty.Value(schema))
1286	}
1287
1288	// Blind write succeeds.
1289	_, err = table.Update(ctx, TableMetadataToUpdate{Name: "x"}, "")
1290	if err != nil {
1291		t.Fatal(err)
1292	}
1293	// Write with old etag fails.
1294	_, err = table.Update(ctx, TableMetadataToUpdate{Name: "y"}, got.ETag)
1295	if err == nil {
1296		t.Fatal("Update with old ETag succeeded, wanted failure")
1297	}
1298
1299	// Test schema update.
1300	// Columns can be added. schema2 is the same as schema, except for the
1301	// added column in the middle.
1302	nested := Schema{
1303		{Name: "nested", Type: BooleanFieldType},
1304		{Name: "other", Type: StringFieldType},
1305	}
1306	schema2 := Schema{
1307		schema[0],
1308		{Name: "rec2", Type: RecordFieldType, Schema: nested},
1309		schema[1],
1310		schema[2],
1311	}
1312
1313	got, err = table.Update(ctx, TableMetadataToUpdate{Schema: schema2}, "")
1314	if err != nil {
1315		t.Fatal(err)
1316	}
1317
1318	// Wherever you add the column, it appears at the end.
1319	schema3 := Schema{schema2[0], schema2[2], schema2[3], schema2[1]}
1320	if !testutil.Equal(got.Schema, schema3) {
1321		t.Errorf("add field:\ngot  %v\nwant %v",
1322			pretty.Value(got.Schema), pretty.Value(schema3))
1323	}
1324
1325	// Updating with the empty schema succeeds, but is a no-op.
1326	got, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema{}}, "")
1327	if err != nil {
1328		t.Fatal(err)
1329	}
1330	if !testutil.Equal(got.Schema, schema3) {
1331		t.Errorf("empty schema:\ngot  %v\nwant %v",
1332			pretty.Value(got.Schema), pretty.Value(schema3))
1333	}
1334
1335	// Error cases when updating schema.
1336	for _, test := range []struct {
1337		desc   string
1338		fields Schema
1339	}{
1340		{"change from optional to required", Schema{
1341			{Name: "name", Type: StringFieldType, Required: true},
1342			schema3[1],
1343			schema3[2],
1344			schema3[3],
1345		}},
1346		{"add a required field", Schema{
1347			schema3[0], schema3[1], schema3[2], schema3[3],
1348			{Name: "req", Type: StringFieldType, Required: true},
1349		}},
1350		{"remove a field", Schema{schema3[0], schema3[1], schema3[2]}},
1351		{"remove a nested field", Schema{
1352			schema3[0], schema3[1], schema3[2],
1353			{Name: "rec2", Type: RecordFieldType, Schema: Schema{nested[0]}}}},
1354		{"remove all nested fields", Schema{
1355			schema3[0], schema3[1], schema3[2],
1356			{Name: "rec2", Type: RecordFieldType, Schema: Schema{}}}},
1357	} {
1358		_, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema(test.fields)}, "")
1359		if err == nil {
1360			t.Errorf("%s: want error, got nil", test.desc)
1361		} else if !hasStatusCode(err, 400) {
1362			t.Errorf("%s: want 400, got %v", test.desc, err)
1363		}
1364	}
1365}
1366
1367func TestIntegration_Load(t *testing.T) {
1368	if client == nil {
1369		t.Skip("Integration tests skipped")
1370	}
1371	ctx := context.Background()
1372	// CSV data can't be loaded into a repeated field, so we use a different schema.
1373	table := newTable(t, Schema{
1374		{Name: "name", Type: StringFieldType},
1375		{Name: "nums", Type: IntegerFieldType},
1376	})
1377	defer table.Delete(ctx)
1378
1379	// Load the table from a reader.
1380	r := strings.NewReader("a,0\nb,1\nc,2\n")
1381	wantRows := [][]Value{
1382		{"a", int64(0)},
1383		{"b", int64(1)},
1384		{"c", int64(2)},
1385	}
1386	rs := NewReaderSource(r)
1387	loader := table.LoaderFrom(rs)
1388	loader.WriteDisposition = WriteTruncate
1389	loader.Labels = map[string]string{"test": "go"}
1390	job, err := loader.Run(ctx)
1391	if err != nil {
1392		t.Fatal(err)
1393	}
1394	if job.LastStatus() == nil {
1395		t.Error("no LastStatus")
1396	}
1397	conf, err := job.Config()
1398	if err != nil {
1399		t.Fatal(err)
1400	}
1401	config, ok := conf.(*LoadConfig)
1402	if !ok {
1403		t.Fatalf("got %T, want LoadConfig", conf)
1404	}
1405	diff := testutil.Diff(config, &loader.LoadConfig,
1406		cmp.AllowUnexported(Table{}),
1407		cmpopts.IgnoreUnexported(Client{}, ReaderSource{}),
1408		// returned schema is at top level, not in the config
1409		cmpopts.IgnoreFields(FileConfig{}, "Schema"))
1410	if diff != "" {
1411		t.Errorf("got=-, want=+:\n%s", diff)
1412	}
1413	if err := wait(ctx, job); err != nil {
1414		t.Fatal(err)
1415	}
1416	checkReadAndTotalRows(t, "reader load", table.Read(ctx), wantRows)
1417
1418}
1419
1420func TestIntegration_DML(t *testing.T) {
1421	if client == nil {
1422		t.Skip("Integration tests skipped")
1423	}
1424	ctx := context.Background()
1425	table := newTable(t, schema)
1426	defer table.Delete(ctx)
1427
1428	sql := fmt.Sprintf(`INSERT %s.%s (name, nums, rec)
1429						VALUES ('a', [0], STRUCT<BOOL>(TRUE)),
1430							   ('b', [1], STRUCT<BOOL>(FALSE)),
1431							   ('c', [2], STRUCT<BOOL>(TRUE))`,
1432		table.DatasetID, table.TableID)
1433	if err := runQueryJob(ctx, sql); err != nil {
1434		t.Fatal(err)
1435	}
1436	wantRows := [][]Value{
1437		{"a", []Value{int64(0)}, []Value{true}},
1438		{"b", []Value{int64(1)}, []Value{false}},
1439		{"c", []Value{int64(2)}, []Value{true}},
1440	}
1441	checkRead(t, "DML", table.Read(ctx), wantRows)
1442}
1443
1444// runQueryJob is useful for running queries where no row data is returned (DDL/DML).
1445func runQueryJob(ctx context.Context, sql string) error {
1446	return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
1447		job, err := client.Query(sql).Run(ctx)
1448		if err != nil {
1449			if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
1450				return true, err // fail on 4xx
1451			}
1452			return false, err
1453		}
1454		_, err = job.Wait(ctx)
1455		if err != nil {
1456			if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
1457				return true, err // fail on 4xx
1458			}
1459			return false, err
1460		}
1461		return true, nil
1462	})
1463}
1464
1465func TestIntegration_TimeTypes(t *testing.T) {
1466	if client == nil {
1467		t.Skip("Integration tests skipped")
1468	}
1469	ctx := context.Background()
1470	dtSchema := Schema{
1471		{Name: "d", Type: DateFieldType},
1472		{Name: "t", Type: TimeFieldType},
1473		{Name: "dt", Type: DateTimeFieldType},
1474		{Name: "ts", Type: TimestampFieldType},
1475	}
1476	table := newTable(t, dtSchema)
1477	defer table.Delete(ctx)
1478
1479	d := civil.Date{Year: 2016, Month: 3, Day: 20}
1480	tm := civil.Time{Hour: 12, Minute: 30, Second: 0, Nanosecond: 6000}
1481	dtm := civil.DateTime{Date: d, Time: tm}
1482	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1483	wantRows := [][]Value{
1484		{d, tm, dtm, ts},
1485	}
1486	ins := table.Inserter()
1487	if err := ins.Put(ctx, []*ValuesSaver{
1488		{Schema: dtSchema, Row: wantRows[0]},
1489	}); err != nil {
1490		t.Fatal(putError(err))
1491	}
1492	if err := waitForRow(ctx, table); err != nil {
1493		t.Fatal(err)
1494	}
1495
1496	// SQL wants DATETIMEs with a space between date and time, but the service
1497	// returns them in RFC3339 form, with a "T" between.
1498	query := fmt.Sprintf("INSERT %s.%s (d, t, dt, ts) "+
1499		"VALUES ('%s', '%s', '%s', '%s')",
1500		table.DatasetID, table.TableID,
1501		d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05"))
1502	if err := runQueryJob(ctx, query); err != nil {
1503		t.Fatal(err)
1504	}
1505	wantRows = append(wantRows, wantRows[0])
1506	checkRead(t, "TimeTypes", table.Read(ctx), wantRows)
1507}
1508
1509func TestIntegration_StandardQuery(t *testing.T) {
1510	if client == nil {
1511		t.Skip("Integration tests skipped")
1512	}
1513	ctx := context.Background()
1514
1515	d := civil.Date{Year: 2016, Month: 3, Day: 20}
1516	tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 0}
1517	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1518	dtm := ts.Format("2006-01-02 15:04:05")
1519
1520	// Constructs Value slices made up of int64s.
1521	ints := func(args ...int) []Value {
1522		vals := make([]Value, len(args))
1523		for i, arg := range args {
1524			vals[i] = int64(arg)
1525		}
1526		return vals
1527	}
1528
1529	testCases := []struct {
1530		query   string
1531		wantRow []Value
1532	}{
1533		{"SELECT 1", ints(1)},
1534		{"SELECT 1.3", []Value{1.3}},
1535		{"SELECT CAST(1.3  AS NUMERIC)", []Value{big.NewRat(13, 10)}},
1536		{"SELECT NUMERIC '0.25'", []Value{big.NewRat(1, 4)}},
1537		{"SELECT TRUE", []Value{true}},
1538		{"SELECT 'ABC'", []Value{"ABC"}},
1539		{"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}},
1540		{fmt.Sprintf("SELECT TIMESTAMP '%s'", dtm), []Value{ts}},
1541		{fmt.Sprintf("SELECT [TIMESTAMP '%s', TIMESTAMP '%s']", dtm, dtm), []Value{[]Value{ts, ts}}},
1542		{fmt.Sprintf("SELECT ('hello', TIMESTAMP '%s')", dtm), []Value{[]Value{"hello", ts}}},
1543		{fmt.Sprintf("SELECT DATETIME(TIMESTAMP '%s')", dtm), []Value{civil.DateTime{Date: d, Time: tm}}},
1544		{fmt.Sprintf("SELECT DATE(TIMESTAMP '%s')", dtm), []Value{d}},
1545		{fmt.Sprintf("SELECT TIME(TIMESTAMP '%s')", dtm), []Value{tm}},
1546		{"SELECT (1, 2)", []Value{ints(1, 2)}},
1547		{"SELECT [1, 2, 3]", []Value{ints(1, 2, 3)}},
1548		{"SELECT ([1, 2], 3, [4, 5])", []Value{[]Value{ints(1, 2), int64(3), ints(4, 5)}}},
1549		{"SELECT [(1, 2, 3), (4, 5, 6)]", []Value{[]Value{ints(1, 2, 3), ints(4, 5, 6)}}},
1550		{"SELECT [([1, 2, 3], 4), ([5, 6], 7)]", []Value{[]Value{[]Value{ints(1, 2, 3), int64(4)}, []Value{ints(5, 6), int64(7)}}}},
1551		{"SELECT ARRAY(SELECT STRUCT([1, 2]))", []Value{[]Value{[]Value{ints(1, 2)}}}},
1552	}
1553	for _, c := range testCases {
1554		q := client.Query(c.query)
1555		it, err := q.Read(ctx)
1556		if err != nil {
1557			t.Fatal(err)
1558		}
1559		checkRead(t, "StandardQuery", it, [][]Value{c.wantRow})
1560	}
1561}
1562
1563func TestIntegration_LegacyQuery(t *testing.T) {
1564	if client == nil {
1565		t.Skip("Integration tests skipped")
1566	}
1567	ctx := context.Background()
1568
1569	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1570	dtm := ts.Format("2006-01-02 15:04:05")
1571
1572	testCases := []struct {
1573		query   string
1574		wantRow []Value
1575	}{
1576		{"SELECT 1", []Value{int64(1)}},
1577		{"SELECT 1.3", []Value{1.3}},
1578		{"SELECT TRUE", []Value{true}},
1579		{"SELECT 'ABC'", []Value{"ABC"}},
1580		{"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}},
1581		{fmt.Sprintf("SELECT TIMESTAMP('%s')", dtm), []Value{ts}},
1582		{fmt.Sprintf("SELECT DATE(TIMESTAMP('%s'))", dtm), []Value{"2016-03-20"}},
1583		{fmt.Sprintf("SELECT TIME(TIMESTAMP('%s'))", dtm), []Value{"15:04:05"}},
1584	}
1585	for _, c := range testCases {
1586		q := client.Query(c.query)
1587		q.UseLegacySQL = true
1588		it, err := q.Read(ctx)
1589		if err != nil {
1590			t.Fatal(err)
1591		}
1592		checkRead(t, "LegacyQuery", it, [][]Value{c.wantRow})
1593	}
1594}
1595
1596func TestIntegration_QueryParameters(t *testing.T) {
1597	if client == nil {
1598		t.Skip("Integration tests skipped")
1599	}
1600	ctx := context.Background()
1601
1602	d := civil.Date{Year: 2016, Month: 3, Day: 20}
1603	tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 3008}
1604	rtm := tm
1605	rtm.Nanosecond = 3000 // round to microseconds
1606	dtm := civil.DateTime{Date: d, Time: tm}
1607	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1608	rat := big.NewRat(13, 10)
1609
1610	type ss struct {
1611		String string
1612	}
1613
1614	type s struct {
1615		Timestamp      time.Time
1616		StringArray    []string
1617		SubStruct      ss
1618		SubStructArray []ss
1619	}
1620
1621	testCases := []struct {
1622		query      string
1623		parameters []QueryParameter
1624		wantRow    []Value
1625		wantConfig interface{}
1626	}{
1627		{
1628			"SELECT @val",
1629			[]QueryParameter{{"val", 1}},
1630			[]Value{int64(1)},
1631			int64(1),
1632		},
1633		{
1634			"SELECT @val",
1635			[]QueryParameter{{"val", 1.3}},
1636			[]Value{1.3},
1637			1.3,
1638		},
1639		{
1640			"SELECT @val",
1641			[]QueryParameter{{"val", rat}},
1642			[]Value{rat},
1643			rat,
1644		},
1645		{
1646			"SELECT @val",
1647			[]QueryParameter{{"val", true}},
1648			[]Value{true},
1649			true,
1650		},
1651		{
1652			"SELECT @val",
1653			[]QueryParameter{{"val", "ABC"}},
1654			[]Value{"ABC"},
1655			"ABC",
1656		},
1657		{
1658			"SELECT @val",
1659			[]QueryParameter{{"val", []byte("foo")}},
1660			[]Value{[]byte("foo")},
1661			[]byte("foo"),
1662		},
1663		{
1664			"SELECT @val",
1665			[]QueryParameter{{"val", ts}},
1666			[]Value{ts},
1667			ts,
1668		},
1669		{
1670			"SELECT @val",
1671			[]QueryParameter{{"val", []time.Time{ts, ts}}},
1672			[]Value{[]Value{ts, ts}},
1673			[]interface{}{ts, ts},
1674		},
1675		{
1676			"SELECT @val",
1677			[]QueryParameter{{"val", dtm}},
1678			[]Value{civil.DateTime{Date: d, Time: rtm}},
1679			civil.DateTime{Date: d, Time: rtm},
1680		},
1681		{
1682			"SELECT @val",
1683			[]QueryParameter{{"val", d}},
1684			[]Value{d},
1685			d,
1686		},
1687		{
1688			"SELECT @val",
1689			[]QueryParameter{{"val", tm}},
1690			[]Value{rtm},
1691			rtm,
1692		},
1693		{
1694			"SELECT @val",
1695			[]QueryParameter{{"val", s{ts, []string{"a", "b"}, ss{"c"}, []ss{{"d"}, {"e"}}}}},
1696			[]Value{[]Value{ts, []Value{"a", "b"}, []Value{"c"}, []Value{[]Value{"d"}, []Value{"e"}}}},
1697			map[string]interface{}{
1698				"Timestamp":   ts,
1699				"StringArray": []interface{}{"a", "b"},
1700				"SubStruct":   map[string]interface{}{"String": "c"},
1701				"SubStructArray": []interface{}{
1702					map[string]interface{}{"String": "d"},
1703					map[string]interface{}{"String": "e"},
1704				},
1705			},
1706		},
1707		{
1708			"SELECT @val.Timestamp, @val.SubStruct.String",
1709			[]QueryParameter{{"val", s{Timestamp: ts, SubStruct: ss{"a"}}}},
1710			[]Value{ts, "a"},
1711			map[string]interface{}{
1712				"Timestamp":      ts,
1713				"SubStruct":      map[string]interface{}{"String": "a"},
1714				"StringArray":    nil,
1715				"SubStructArray": nil,
1716			},
1717		},
1718	}
1719	for _, c := range testCases {
1720		q := client.Query(c.query)
1721		q.Parameters = c.parameters
1722		job, err := q.Run(ctx)
1723		if err != nil {
1724			t.Fatal(err)
1725		}
1726		if job.LastStatus() == nil {
1727			t.Error("no LastStatus")
1728		}
1729		it, err := job.Read(ctx)
1730		if err != nil {
1731			t.Fatal(err)
1732		}
1733		checkRead(t, "QueryParameters", it, [][]Value{c.wantRow})
1734		config, err := job.Config()
1735		if err != nil {
1736			t.Fatal(err)
1737		}
1738		got := config.(*QueryConfig).Parameters[0].Value
1739		if !testutil.Equal(got, c.wantConfig) {
1740			t.Errorf("param %[1]v (%[1]T): config:\ngot %[2]v (%[2]T)\nwant %[3]v (%[3]T)",
1741				c.parameters[0].Value, got, c.wantConfig)
1742		}
1743	}
1744}
1745
1746func TestIntegration_QueryDryRun(t *testing.T) {
1747	if client == nil {
1748		t.Skip("Integration tests skipped")
1749	}
1750	ctx := context.Background()
1751	q := client.Query("SELECT word from " + stdName + " LIMIT 10")
1752	q.DryRun = true
1753	job, err := q.Run(ctx)
1754	if err != nil {
1755		t.Fatal(err)
1756	}
1757
1758	s := job.LastStatus()
1759	if s.State != Done {
1760		t.Errorf("state is %v, expected Done", s.State)
1761	}
1762	if s.Statistics == nil {
1763		t.Fatal("no statistics")
1764	}
1765	if s.Statistics.Details.(*QueryStatistics).Schema == nil {
1766		t.Fatal("no schema")
1767	}
1768	if s.Statistics.Details.(*QueryStatistics).TotalBytesProcessedAccuracy == "" {
1769		t.Fatal("no cost accuracy")
1770	}
1771}
1772
1773func TestIntegration_Scripting(t *testing.T) {
1774	if client == nil {
1775		t.Skip("Integration tests skipped")
1776	}
1777	ctx := context.Background()
1778	sql := `
1779	-- Declare a variable to hold names as an array.
1780	DECLARE top_names ARRAY<STRING>;
1781	-- Build an array of the top 100 names from the year 2017.
1782	SET top_names = (
1783	  SELECT ARRAY_AGG(name ORDER BY number DESC LIMIT 100)
1784	  FROM ` + "`bigquery-public-data`" + `.usa_names.usa_1910_current
1785	  WHERE year = 2017
1786	);
1787	-- Which names appear as words in Shakespeare's plays?
1788	SELECT
1789	  name AS shakespeare_name
1790	FROM UNNEST(top_names) AS name
1791	WHERE name IN (
1792	  SELECT word
1793	  FROM ` + "`bigquery-public-data`" + `.samples.shakespeare
1794	);
1795	`
1796	q := client.Query(sql)
1797	job, err := q.Run(ctx)
1798	if err != nil {
1799		t.Fatalf("failed to run parent job: %v", err)
1800	}
1801	status, err := job.Wait(ctx)
1802	if err != nil {
1803		t.Fatalf("failed to wait for completion: %v", err)
1804	}
1805	if status.Err() != nil {
1806		t.Fatalf("job terminated with error: %v", err)
1807	}
1808
1809	queryStats, ok := status.Statistics.Details.(*QueryStatistics)
1810	if !ok {
1811		t.Fatalf("failed to fetch query statistics")
1812	}
1813
1814	want := "SCRIPT"
1815	if queryStats.StatementType != want {
1816		t.Errorf("statement type mismatch. got %s want %s", queryStats.StatementType, want)
1817	}
1818
1819	if status.Statistics.NumChildJobs <= 0 {
1820		t.Errorf("expected script to indicate nonzero child jobs, got %d", status.Statistics.NumChildJobs)
1821	}
1822
1823	// Ensure child jobs are present.
1824	var childJobs []*Job
1825
1826	it := job.Children(ctx)
1827	for {
1828		job, err := it.Next()
1829		if err == iterator.Done {
1830			break
1831		}
1832		if err != nil {
1833			t.Fatal(err)
1834		}
1835		childJobs = append(childJobs, job)
1836	}
1837	if len(childJobs) == 0 {
1838		t.Fatal("Script had no child jobs.")
1839	}
1840
1841	for _, cj := range childJobs {
1842		cStatus := cj.LastStatus()
1843		if cStatus.Statistics.ParentJobID != job.ID() {
1844			t.Errorf("child job %q doesn't indicate parent.  got %q, want %q", cj.ID(), cStatus.Statistics.ParentJobID, job.ID())
1845		}
1846		if cStatus.Statistics.ScriptStatistics == nil {
1847			t.Errorf("child job %q doesn't have script statistics present", cj.ID())
1848		}
1849		if cStatus.Statistics.ScriptStatistics.EvaluationKind == "" {
1850			t.Errorf("child job %q didn't indicate evaluation kind", cj.ID())
1851		}
1852	}
1853
1854}
1855
1856func TestIntegration_ExtractExternal(t *testing.T) {
1857	// Create a table, extract it to GCS, then query it externally.
1858	if client == nil {
1859		t.Skip("Integration tests skipped")
1860	}
1861	ctx := context.Background()
1862	schema := Schema{
1863		{Name: "name", Type: StringFieldType},
1864		{Name: "num", Type: IntegerFieldType},
1865	}
1866	table := newTable(t, schema)
1867	defer table.Delete(ctx)
1868
1869	// Insert table data.
1870	sql := fmt.Sprintf(`INSERT %s.%s (name, num)
1871		                VALUES ('a', 1), ('b', 2), ('c', 3)`,
1872		table.DatasetID, table.TableID)
1873	if err := runQueryJob(ctx, sql); err != nil {
1874		t.Fatal(err)
1875	}
1876	// Extract to a GCS object as CSV.
1877	bucketName := testutil.ProjID()
1878	objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID)
1879	uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
1880	defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
1881	gr := NewGCSReference(uri)
1882	gr.DestinationFormat = CSV
1883	e := table.ExtractorTo(gr)
1884	job, err := e.Run(ctx)
1885	if err != nil {
1886		t.Fatal(err)
1887	}
1888	conf, err := job.Config()
1889	if err != nil {
1890		t.Fatal(err)
1891	}
1892	config, ok := conf.(*ExtractConfig)
1893	if !ok {
1894		t.Fatalf("got %T, want ExtractConfig", conf)
1895	}
1896	diff := testutil.Diff(config, &e.ExtractConfig,
1897		cmp.AllowUnexported(Table{}),
1898		cmpopts.IgnoreUnexported(Client{}))
1899	if diff != "" {
1900		t.Errorf("got=-, want=+:\n%s", diff)
1901	}
1902	if err := wait(ctx, job); err != nil {
1903		t.Fatal(err)
1904	}
1905
1906	edc := &ExternalDataConfig{
1907		SourceFormat: CSV,
1908		SourceURIs:   []string{uri},
1909		Schema:       schema,
1910		Options: &CSVOptions{
1911			SkipLeadingRows: 1,
1912			// This is the default. Since we use edc as an expectation later on,
1913			// let's just be explicit.
1914			FieldDelimiter: ",",
1915		},
1916	}
1917	// Query that CSV file directly.
1918	q := client.Query("SELECT * FROM csv")
1919	q.TableDefinitions = map[string]ExternalData{"csv": edc}
1920	wantRows := [][]Value{
1921		{"a", int64(1)},
1922		{"b", int64(2)},
1923		{"c", int64(3)},
1924	}
1925	iter, err := q.Read(ctx)
1926	if err != nil {
1927		t.Fatal(err)
1928	}
1929	checkReadAndTotalRows(t, "external query", iter, wantRows)
1930
1931	// Make a table pointing to the file, and query it.
1932	// BigQuery does not allow a Table.Read on an external table.
1933	table = dataset.Table(tableIDs.New())
1934	err = table.Create(context.Background(), &TableMetadata{
1935		Schema:             schema,
1936		ExpirationTime:     testTableExpiration,
1937		ExternalDataConfig: edc,
1938	})
1939	if err != nil {
1940		t.Fatal(err)
1941	}
1942	q = client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID))
1943	iter, err = q.Read(ctx)
1944	if err != nil {
1945		t.Fatal(err)
1946	}
1947	checkReadAndTotalRows(t, "external table", iter, wantRows)
1948
1949	// While we're here, check that the table metadata is correct.
1950	md, err := table.Metadata(ctx)
1951	if err != nil {
1952		t.Fatal(err)
1953	}
1954	// One difference: since BigQuery returns the schema as part of the ordinary
1955	// table metadata, it does not populate ExternalDataConfig.Schema.
1956	md.ExternalDataConfig.Schema = md.Schema
1957	if diff := testutil.Diff(md.ExternalDataConfig, edc); diff != "" {
1958		t.Errorf("got=-, want=+\n%s", diff)
1959	}
1960}
1961
1962func TestIntegration_ReadNullIntoStruct(t *testing.T) {
1963	// Reading a null into a struct field should return an error (not panic).
1964	if client == nil {
1965		t.Skip("Integration tests skipped")
1966	}
1967	ctx := context.Background()
1968	table := newTable(t, schema)
1969	defer table.Delete(ctx)
1970
1971	ins := table.Inserter()
1972	row := &ValuesSaver{
1973		Schema: schema,
1974		Row:    []Value{nil, []Value{}, []Value{nil}},
1975	}
1976	if err := ins.Put(ctx, []*ValuesSaver{row}); err != nil {
1977		t.Fatal(putError(err))
1978	}
1979	if err := waitForRow(ctx, table); err != nil {
1980		t.Fatal(err)
1981	}
1982
1983	q := client.Query(fmt.Sprintf("select name from %s", table.TableID))
1984	q.DefaultProjectID = dataset.ProjectID
1985	q.DefaultDatasetID = dataset.DatasetID
1986	it, err := q.Read(ctx)
1987	if err != nil {
1988		t.Fatal(err)
1989	}
1990	type S struct{ Name string }
1991	var s S
1992	if err := it.Next(&s); err == nil {
1993		t.Fatal("got nil, want error")
1994	}
1995}
1996
1997const (
1998	stdName    = "`bigquery-public-data.samples.shakespeare`"
1999	legacyName = "[bigquery-public-data:samples.shakespeare]"
2000)
2001
2002// These tests exploit the fact that the two SQL versions have different syntaxes for
2003// fully-qualified table names.
2004var useLegacySQLTests = []struct {
2005	t           string // name of table
2006	std, legacy bool   // use standard/legacy SQL
2007	err         bool   // do we expect an error?
2008}{
2009	{t: legacyName, std: false, legacy: true, err: false},
2010	{t: legacyName, std: true, legacy: false, err: true},
2011	{t: legacyName, std: false, legacy: false, err: true}, // standard SQL is default
2012	{t: legacyName, std: true, legacy: true, err: true},
2013	{t: stdName, std: false, legacy: true, err: true},
2014	{t: stdName, std: true, legacy: false, err: false},
2015	{t: stdName, std: false, legacy: false, err: false}, // standard SQL is default
2016	{t: stdName, std: true, legacy: true, err: true},
2017}
2018
2019func TestIntegration_QueryUseLegacySQL(t *testing.T) {
2020	// Test the UseLegacySQL and UseStandardSQL options for queries.
2021	if client == nil {
2022		t.Skip("Integration tests skipped")
2023	}
2024	ctx := context.Background()
2025	for _, test := range useLegacySQLTests {
2026		q := client.Query(fmt.Sprintf("select word from %s limit 1", test.t))
2027		q.UseStandardSQL = test.std
2028		q.UseLegacySQL = test.legacy
2029		_, err := q.Read(ctx)
2030		gotErr := err != nil
2031		if gotErr && !test.err {
2032			t.Errorf("%+v:\nunexpected error: %v", test, err)
2033		} else if !gotErr && test.err {
2034			t.Errorf("%+v:\nsucceeded, but want error", test)
2035		}
2036	}
2037}
2038
2039func TestIntegration_TableUseLegacySQL(t *testing.T) {
2040	// Test UseLegacySQL and UseStandardSQL for Table.Create.
2041	if client == nil {
2042		t.Skip("Integration tests skipped")
2043	}
2044	ctx := context.Background()
2045	table := newTable(t, schema)
2046	defer table.Delete(ctx)
2047	for i, test := range useLegacySQLTests {
2048		view := dataset.Table(fmt.Sprintf("t_view_%d", i))
2049		tm := &TableMetadata{
2050			ViewQuery:      fmt.Sprintf("SELECT word from %s", test.t),
2051			UseStandardSQL: test.std,
2052			UseLegacySQL:   test.legacy,
2053		}
2054		err := view.Create(ctx, tm)
2055		gotErr := err != nil
2056		if gotErr && !test.err {
2057			t.Errorf("%+v:\nunexpected error: %v", test, err)
2058		} else if !gotErr && test.err {
2059			t.Errorf("%+v:\nsucceeded, but want error", test)
2060		}
2061		_ = view.Delete(ctx)
2062	}
2063}
2064
2065func TestIntegration_ListJobs(t *testing.T) {
2066	// It's difficult to test the list of jobs, because we can't easily
2067	// control what's in it. Also, there are many jobs in the test project,
2068	// and it takes considerable time to list them all.
2069	if client == nil {
2070		t.Skip("Integration tests skipped")
2071	}
2072	ctx := context.Background()
2073
2074	// About all we can do is list a few jobs.
2075	const max = 20
2076	var jobs []*Job
2077	it := client.Jobs(ctx)
2078	for {
2079		job, err := it.Next()
2080		if err == iterator.Done {
2081			break
2082		}
2083		if err != nil {
2084			t.Fatal(err)
2085		}
2086		jobs = append(jobs, job)
2087		if len(jobs) >= max {
2088			break
2089		}
2090	}
2091	// We expect that there is at least one job in the last few months.
2092	if len(jobs) == 0 {
2093		t.Fatal("did not get any jobs")
2094	}
2095}
2096
2097const tokyo = "asia-northeast1"
2098
2099func TestIntegration_Location(t *testing.T) {
2100	if client == nil {
2101		t.Skip("Integration tests skipped")
2102	}
2103	client.Location = ""
2104	testLocation(t, tokyo)
2105	client.Location = tokyo
2106	defer func() {
2107		client.Location = ""
2108	}()
2109	testLocation(t, "")
2110}
2111
2112func testLocation(t *testing.T, loc string) {
2113	ctx := context.Background()
2114	tokyoDataset := client.Dataset("tokyo")
2115	err := tokyoDataset.Create(ctx, &DatasetMetadata{Location: loc})
2116	if err != nil && !hasStatusCode(err, 409) { // 409 = already exists
2117		t.Fatal(err)
2118	}
2119	md, err := tokyoDataset.Metadata(ctx)
2120	if err != nil {
2121		t.Fatal(err)
2122	}
2123	if md.Location != tokyo {
2124		t.Fatalf("dataset location: got %s, want %s", md.Location, tokyo)
2125	}
2126	table := tokyoDataset.Table(tableIDs.New())
2127	err = table.Create(context.Background(), &TableMetadata{
2128		Schema: Schema{
2129			{Name: "name", Type: StringFieldType},
2130			{Name: "nums", Type: IntegerFieldType},
2131		},
2132		ExpirationTime: testTableExpiration,
2133	})
2134	if err != nil {
2135		t.Fatal(err)
2136	}
2137	defer table.Delete(ctx)
2138	loader := table.LoaderFrom(NewReaderSource(strings.NewReader("a,0\nb,1\nc,2\n")))
2139	loader.Location = loc
2140	job, err := loader.Run(ctx)
2141	if err != nil {
2142		t.Fatal("loader.Run", err)
2143	}
2144	if job.Location() != tokyo {
2145		t.Fatalf("job location: got %s, want %s", job.Location(), tokyo)
2146	}
2147	_, err = client.JobFromID(ctx, job.ID())
2148	if client.Location == "" && err == nil {
2149		t.Error("JobFromID with Tokyo job, no client location: want error, got nil")
2150	}
2151	if client.Location != "" && err != nil {
2152		t.Errorf("JobFromID with Tokyo job, with client location: want nil, got %v", err)
2153	}
2154	_, err = client.JobFromIDLocation(ctx, job.ID(), "US")
2155	if err == nil {
2156		t.Error("JobFromIDLocation with US: want error, got nil")
2157	}
2158	job2, err := client.JobFromIDLocation(ctx, job.ID(), loc)
2159	if loc == tokyo && err != nil {
2160		t.Errorf("loc=tokyo: %v", err)
2161	}
2162	if loc == "" && err == nil {
2163		t.Error("loc empty: got nil, want error")
2164	}
2165	if job2 != nil && (job2.ID() != job.ID() || job2.Location() != tokyo) {
2166		t.Errorf("got id %s loc %s, want id%s loc %s", job2.ID(), job2.Location(), job.ID(), tokyo)
2167	}
2168	if err := wait(ctx, job); err != nil {
2169		t.Fatal(err)
2170	}
2171	// Cancel should succeed even if the job is done.
2172	if err := job.Cancel(ctx); err != nil {
2173		t.Fatal(err)
2174	}
2175
2176	q := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID))
2177	q.Location = loc
2178	iter, err := q.Read(ctx)
2179	if err != nil {
2180		t.Fatal(err)
2181	}
2182	wantRows := [][]Value{
2183		{"a", int64(0)},
2184		{"b", int64(1)},
2185		{"c", int64(2)},
2186	}
2187	checkRead(t, "location", iter, wantRows)
2188
2189	table2 := tokyoDataset.Table(tableIDs.New())
2190	copier := table2.CopierFrom(table)
2191	copier.Location = loc
2192	if _, err := copier.Run(ctx); err != nil {
2193		t.Fatal(err)
2194	}
2195	bucketName := testutil.ProjID()
2196	objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID)
2197	uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
2198	defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
2199	gr := NewGCSReference(uri)
2200	gr.DestinationFormat = CSV
2201	e := table.ExtractorTo(gr)
2202	e.Location = loc
2203	if _, err := e.Run(ctx); err != nil {
2204		t.Fatal(err)
2205	}
2206}
2207
2208func TestIntegration_NumericErrors(t *testing.T) {
2209	// Verify that the service returns an error for a big.Rat that's too large.
2210	if client == nil {
2211		t.Skip("Integration tests skipped")
2212	}
2213	ctx := context.Background()
2214	schema := Schema{{Name: "n", Type: NumericFieldType}}
2215	table := newTable(t, schema)
2216	defer table.Delete(ctx)
2217	tooBigRat := &big.Rat{}
2218	if _, ok := tooBigRat.SetString("1e40"); !ok {
2219		t.Fatal("big.Rat.SetString failed")
2220	}
2221	ins := table.Inserter()
2222	err := ins.Put(ctx, []*ValuesSaver{{Schema: schema, Row: []Value{tooBigRat}}})
2223	if err == nil {
2224		t.Fatal("got nil, want error")
2225	}
2226}
2227
2228func TestIntegration_QueryErrors(t *testing.T) {
2229	// Verify that a bad query returns an appropriate error.
2230	if client == nil {
2231		t.Skip("Integration tests skipped")
2232	}
2233	ctx := context.Background()
2234	q := client.Query("blah blah broken")
2235	_, err := q.Read(ctx)
2236	const want = "invalidQuery"
2237	if !strings.Contains(err.Error(), want) {
2238		t.Fatalf("got %q, want substring %q", err, want)
2239	}
2240}
2241
2242func TestIntegration_ModelLifecycle(t *testing.T) {
2243	if client == nil {
2244		t.Skip("Integration tests skipped")
2245	}
2246	ctx := context.Background()
2247
2248	// Create a model via a CREATE MODEL query
2249	modelID := modelIDs.New()
2250	model := dataset.Model(modelID)
2251	modelRef := fmt.Sprintf("%s.%s.%s", dataset.ProjectID, dataset.DatasetID, modelID)
2252
2253	sql := fmt.Sprintf(`
2254		CREATE MODEL `+"`%s`"+`
2255		OPTIONS (
2256			model_type='linear_reg',
2257			max_iteration=1,
2258			learn_rate=0.4,
2259			learn_rate_strategy='constant'
2260		) AS (
2261			SELECT 'a' AS f1, 2.0 AS label
2262			UNION ALL
2263			SELECT 'b' AS f1, 3.8 AS label
2264		)`, modelRef)
2265	if err := runQueryJob(ctx, sql); err != nil {
2266		t.Fatal(err)
2267	}
2268	defer model.Delete(ctx)
2269
2270	// Get the model metadata.
2271	curMeta, err := model.Metadata(ctx)
2272	if err != nil {
2273		t.Fatalf("couldn't get metadata: %v", err)
2274	}
2275
2276	want := "LINEAR_REGRESSION"
2277	if curMeta.Type != want {
2278		t.Errorf("Model type mismatch.  Want %s got %s", curMeta.Type, want)
2279	}
2280
2281	// Ensure training metadata is available.
2282	runs := curMeta.RawTrainingRuns()
2283	if runs == nil {
2284		t.Errorf("training runs unpopulated.")
2285	}
2286	labelCols, err := curMeta.RawLabelColumns()
2287	if err != nil {
2288		t.Fatalf("failed to get label cols: %v", err)
2289	}
2290	if labelCols == nil {
2291		t.Errorf("label column information unpopulated.")
2292	}
2293	featureCols, err := curMeta.RawFeatureColumns()
2294	if err != nil {
2295		t.Fatalf("failed to get feature cols: %v", err)
2296	}
2297	if featureCols == nil {
2298		t.Errorf("feature column information unpopulated.")
2299	}
2300
2301	// Update mutable fields via API.
2302	expiry := time.Now().Add(24 * time.Hour).Truncate(time.Millisecond)
2303
2304	upd := ModelMetadataToUpdate{
2305		Description:    "new",
2306		Name:           "friendly",
2307		ExpirationTime: expiry,
2308	}
2309
2310	newMeta, err := model.Update(ctx, upd, curMeta.ETag)
2311	if err != nil {
2312		t.Fatalf("failed to update: %v", err)
2313	}
2314
2315	want = "new"
2316	if newMeta.Description != want {
2317		t.Fatalf("Description not updated. got %s want %s", newMeta.Description, want)
2318	}
2319	want = "friendly"
2320	if newMeta.Name != want {
2321		t.Fatalf("Description not updated. got %s want %s", newMeta.Description, want)
2322	}
2323	if newMeta.ExpirationTime != expiry {
2324		t.Fatalf("ExpirationTime not updated.  got %v want %v", newMeta.ExpirationTime, expiry)
2325	}
2326
2327	// Ensure presence when enumerating the model list.
2328	it := dataset.Models(ctx)
2329	seen := false
2330	for {
2331		mdl, err := it.Next()
2332		if err == iterator.Done {
2333			break
2334		}
2335		if err != nil {
2336			t.Fatal(err)
2337		}
2338		if mdl.ModelID == modelID {
2339			seen = true
2340		}
2341	}
2342	if !seen {
2343		t.Fatal("model not listed in dataset")
2344	}
2345
2346	// Delete the model.
2347	if err := model.Delete(ctx); err != nil {
2348		t.Fatalf("failed to delete model: %v", err)
2349	}
2350}
2351
2352func TestIntegration_RoutineScalarUDF(t *testing.T) {
2353	if client == nil {
2354		t.Skip("Integration tests skipped")
2355	}
2356	ctx := context.Background()
2357
2358	// Create a scalar UDF routine via API.
2359	routineID := routineIDs.New()
2360	routine := dataset.Routine(routineID)
2361	err := routine.Create(ctx, &RoutineMetadata{
2362		Type:     "SCALAR_FUNCTION",
2363		Language: "SQL",
2364		Body:     "x * 3",
2365		Arguments: []*RoutineArgument{
2366			{
2367				Name: "x",
2368				DataType: &StandardSQLDataType{
2369					TypeKind: "INT64",
2370				},
2371			},
2372		},
2373	})
2374	if err != nil {
2375		t.Fatalf("Create: %v", err)
2376	}
2377}
2378
2379func TestIntegration_RoutineComplexTypes(t *testing.T) {
2380	if client == nil {
2381		t.Skip("Integration tests skipped")
2382	}
2383	ctx := context.Background()
2384
2385	routineID := routineIDs.New()
2386	routine := dataset.Routine(routineID)
2387	sql := fmt.Sprintf(`
2388		CREATE FUNCTION `+"`%s`("+`
2389			arr ARRAY<STRUCT<name STRING, val INT64>>
2390		  ) AS (
2391			  (SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem)
2392		  )`,
2393		routine.FullyQualifiedName())
2394	if err := runQueryJob(ctx, sql); err != nil {
2395		t.Fatal(err)
2396	}
2397	defer routine.Delete(ctx)
2398
2399	meta, err := routine.Metadata(ctx)
2400	if err != nil {
2401		t.Fatalf("Metadata: %v", err)
2402	}
2403	if meta.Type != "SCALAR_FUNCTION" {
2404		t.Fatalf("routine type mismatch, got %s want SCALAR_FUNCTION", meta.Type)
2405	}
2406	if meta.Language != "SQL" {
2407		t.Fatalf("language type mismatch, got  %s want SQL", meta.Language)
2408	}
2409	want := []*RoutineArgument{
2410		{
2411			Name: "arr",
2412			DataType: &StandardSQLDataType{
2413				TypeKind: "ARRAY",
2414				ArrayElementType: &StandardSQLDataType{
2415					TypeKind: "STRUCT",
2416					StructType: &StandardSQLStructType{
2417						Fields: []*StandardSQLField{
2418							{
2419								Name: "name",
2420								Type: &StandardSQLDataType{
2421									TypeKind: "STRING",
2422								},
2423							},
2424							{
2425								Name: "val",
2426								Type: &StandardSQLDataType{
2427									TypeKind: "INT64",
2428								},
2429							},
2430						},
2431					},
2432				},
2433			},
2434		},
2435	}
2436	if diff := testutil.Diff(meta.Arguments, want); diff != "" {
2437		t.Fatalf("%+v: -got, +want:\n%s", meta.Arguments, diff)
2438	}
2439}
2440
2441func TestIntegration_RoutineLifecycle(t *testing.T) {
2442	if client == nil {
2443		t.Skip("Integration tests skipped")
2444	}
2445	ctx := context.Background()
2446
2447	// Create a scalar UDF routine via a CREATE FUNCTION query
2448	routineID := routineIDs.New()
2449	routine := dataset.Routine(routineID)
2450
2451	sql := fmt.Sprintf(`
2452		CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
2453		routine.FullyQualifiedName())
2454	if err := runQueryJob(ctx, sql); err != nil {
2455		t.Fatal(err)
2456	}
2457	defer routine.Delete(ctx)
2458
2459	// Get the routine metadata.
2460	curMeta, err := routine.Metadata(ctx)
2461	if err != nil {
2462		t.Fatalf("couldn't get metadata: %v", err)
2463	}
2464
2465	want := "SCALAR_FUNCTION"
2466	if curMeta.Type != want {
2467		t.Errorf("Routine type mismatch.  got %s want %s", curMeta.Type, want)
2468	}
2469
2470	want = "SQL"
2471	if curMeta.Language != want {
2472		t.Errorf("Language mismatch. got %s want %s", curMeta.Language, want)
2473	}
2474
2475	// Perform an update to change the routine body and description.
2476	want = "x * 4"
2477	wantDescription := "an updated description"
2478	// during beta, update doesn't allow partial updates.  Provide all fields.
2479	newMeta, err := routine.Update(ctx, &RoutineMetadataToUpdate{
2480		Body:        want,
2481		Arguments:   curMeta.Arguments,
2482		Description: wantDescription,
2483		ReturnType:  curMeta.ReturnType,
2484		Type:        curMeta.Type,
2485	}, curMeta.ETag)
2486	if err != nil {
2487		t.Fatalf("Update: %v", err)
2488	}
2489	if newMeta.Body != want {
2490		t.Fatalf("Update body failed. want %s got %s", want, newMeta.Body)
2491	}
2492	if newMeta.Description != wantDescription {
2493		t.Fatalf("Update description failed. want %s got %s", wantDescription, newMeta.Description)
2494	}
2495
2496	// Ensure presence when enumerating the model list.
2497	it := dataset.Routines(ctx)
2498	seen := false
2499	for {
2500		r, err := it.Next()
2501		if err == iterator.Done {
2502			break
2503		}
2504		if err != nil {
2505			t.Fatal(err)
2506		}
2507		if r.RoutineID == routineID {
2508			seen = true
2509		}
2510	}
2511	if !seen {
2512		t.Fatal("routine not listed in dataset")
2513	}
2514
2515	// Delete the model.
2516	if err := routine.Delete(ctx); err != nil {
2517		t.Fatalf("failed to delete routine: %v", err)
2518	}
2519}
2520
2521// Creates a new, temporary table with a unique name and the given schema.
2522func newTable(t *testing.T, s Schema) *Table {
2523	table := dataset.Table(tableIDs.New())
2524	err := table.Create(context.Background(), &TableMetadata{
2525		Schema:         s,
2526		ExpirationTime: testTableExpiration,
2527	})
2528	if err != nil {
2529		t.Fatal(err)
2530	}
2531	return table
2532}
2533
2534func checkRead(t *testing.T, msg string, it *RowIterator, want [][]Value) {
2535	if msg2, ok := compareRead(it, want, false); !ok {
2536		t.Errorf("%s: %s", msg, msg2)
2537	}
2538}
2539
2540func checkReadAndTotalRows(t *testing.T, msg string, it *RowIterator, want [][]Value) {
2541	if msg2, ok := compareRead(it, want, true); !ok {
2542		t.Errorf("%s: %s", msg, msg2)
2543	}
2544}
2545
2546func compareRead(it *RowIterator, want [][]Value, compareTotalRows bool) (msg string, ok bool) {
2547	got, _, totalRows, err := readAll(it)
2548	if err != nil {
2549		return err.Error(), false
2550	}
2551	if len(got) != len(want) {
2552		return fmt.Sprintf("got %d rows, want %d", len(got), len(want)), false
2553	}
2554	if compareTotalRows && len(got) != int(totalRows) {
2555		return fmt.Sprintf("got %d rows, but totalRows = %d", len(got), totalRows), false
2556	}
2557	sort.Sort(byCol0(got))
2558	for i, r := range got {
2559		gotRow := []Value(r)
2560		wantRow := want[i]
2561		if !testutil.Equal(gotRow, wantRow) {
2562			return fmt.Sprintf("#%d: got %#v, want %#v", i, gotRow, wantRow), false
2563		}
2564	}
2565	return "", true
2566}
2567
2568func readAll(it *RowIterator) ([][]Value, Schema, uint64, error) {
2569	var (
2570		rows      [][]Value
2571		schema    Schema
2572		totalRows uint64
2573	)
2574	for {
2575		var vals []Value
2576		err := it.Next(&vals)
2577		if err == iterator.Done {
2578			return rows, schema, totalRows, nil
2579		}
2580		if err != nil {
2581			return nil, nil, 0, err
2582		}
2583		rows = append(rows, vals)
2584		schema = it.Schema
2585		totalRows = it.TotalRows
2586	}
2587}
2588
2589type byCol0 [][]Value
2590
2591func (b byCol0) Len() int      { return len(b) }
2592func (b byCol0) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
2593func (b byCol0) Less(i, j int) bool {
2594	switch a := b[i][0].(type) {
2595	case string:
2596		return a < b[j][0].(string)
2597	case civil.Date:
2598		return a.Before(b[j][0].(civil.Date))
2599	default:
2600		panic("unknown type")
2601	}
2602}
2603
2604func hasStatusCode(err error, code int) bool {
2605	if e, ok := err.(*googleapi.Error); ok && e.Code == code {
2606		return true
2607	}
2608	return false
2609}
2610
2611// wait polls the job until it is complete or an error is returned.
2612func wait(ctx context.Context, job *Job) error {
2613	status, err := job.Wait(ctx)
2614	if err != nil {
2615		return err
2616	}
2617	if status.Err() != nil {
2618		return fmt.Errorf("job status error: %#v", status.Err())
2619	}
2620	if status.Statistics == nil {
2621		return errors.New("nil Statistics")
2622	}
2623	if status.Statistics.EndTime.IsZero() {
2624		return errors.New("EndTime is zero")
2625	}
2626	if status.Statistics.Details == nil {
2627		return errors.New("nil Statistics.Details")
2628	}
2629	return nil
2630}
2631
2632// waitForRow polls the table until it contains a row.
2633// TODO(jba): use internal.Retry.
2634func waitForRow(ctx context.Context, table *Table) error {
2635	for {
2636		it := table.Read(ctx)
2637		var v []Value
2638		err := it.Next(&v)
2639		if err == nil {
2640			return nil
2641		}
2642		if err != iterator.Done {
2643			return err
2644		}
2645		time.Sleep(1 * time.Second)
2646	}
2647}
2648
2649func putError(err error) string {
2650	pme, ok := err.(PutMultiError)
2651	if !ok {
2652		return err.Error()
2653	}
2654	var msgs []string
2655	for _, err := range pme {
2656		msgs = append(msgs, err.Error())
2657	}
2658	return strings.Join(msgs, "\n")
2659}
2660