1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"encoding/json"
20	"errors"
21	"flag"
22	"fmt"
23	"log"
24	"math/big"
25	"net/http"
26	"os"
27	"sort"
28	"strings"
29	"testing"
30	"time"
31
32	"cloud.google.com/go/civil"
33	"cloud.google.com/go/httpreplay"
34	"cloud.google.com/go/internal"
35	"cloud.google.com/go/internal/pretty"
36	"cloud.google.com/go/internal/testutil"
37	"cloud.google.com/go/internal/uid"
38	"cloud.google.com/go/storage"
39	"github.com/google/go-cmp/cmp"
40	"github.com/google/go-cmp/cmp/cmpopts"
41	gax "github.com/googleapis/gax-go/v2"
42	"google.golang.org/api/googleapi"
43	"google.golang.org/api/iterator"
44	"google.golang.org/api/option"
45)
46
47const replayFilename = "bigquery.replay"
48
49var record = flag.Bool("record", false, "record RPCs")
50
51var (
52	client        *Client
53	storageClient *storage.Client
54	dataset       *Dataset
55	schema        = Schema{
56		{Name: "name", Type: StringFieldType},
57		{Name: "nums", Type: IntegerFieldType, Repeated: true},
58		{Name: "rec", Type: RecordFieldType, Schema: Schema{
59			{Name: "bool", Type: BooleanFieldType},
60		}},
61	}
62	testTableExpiration                        time.Time
63	datasetIDs, tableIDs, modelIDs, routineIDs *uid.Space
64)
65
66// Note: integration tests cannot be run in parallel, because TestIntegration_Location
67// modifies the client.
68
69func TestMain(m *testing.M) {
70	cleanup := initIntegrationTest()
71	r := m.Run()
72	cleanup()
73	os.Exit(r)
74}
75
76func getClient(t *testing.T) *Client {
77	if client == nil {
78		t.Skip("Integration tests skipped")
79	}
80	return client
81}
82
83var grpcHeadersChecker = testutil.DefaultHeadersEnforcer()
84
85// If integration tests will be run, create a unique dataset for them.
86// Return a cleanup function.
87func initIntegrationTest() func() {
88	ctx := context.Background()
89	flag.Parse() // needed for testing.Short()
90	projID := testutil.ProjID()
91	switch {
92	case testing.Short() && *record:
93		log.Fatal("cannot combine -short and -record")
94		return func() {}
95
96	case testing.Short() && httpreplay.Supported() && testutil.CanReplay(replayFilename) && projID != "":
97		// go test -short with a replay file will replay the integration tests if the
98		// environment variables are set.
99		log.Printf("replaying from %s", replayFilename)
100		httpreplay.DebugHeaders()
101		replayer, err := httpreplay.NewReplayer(replayFilename)
102		if err != nil {
103			log.Fatal(err)
104		}
105		var t time.Time
106		if err := json.Unmarshal(replayer.Initial(), &t); err != nil {
107			log.Fatal(err)
108		}
109		hc, err := replayer.Client(ctx) // no creds needed
110		if err != nil {
111			log.Fatal(err)
112		}
113		copts := append(grpcHeadersChecker.CallOptions(), option.WithHTTPClient(hc))
114		client, err = NewClient(ctx, projID, copts...)
115		if err != nil {
116			log.Fatal(err)
117		}
118		storageClient, err = storage.NewClient(ctx, copts...)
119		if err != nil {
120			log.Fatal(err)
121		}
122		cleanup := initTestState(client, t)
123		return func() {
124			cleanup()
125			_ = replayer.Close() // No actionable error returned.
126		}
127
128	case testing.Short():
129		// go test -short without a replay file skips the integration tests.
130		if testutil.CanReplay(replayFilename) && projID != "" {
131			log.Print("replay not supported for Go versions before 1.8")
132		}
133		client = nil
134		storageClient = nil
135		return func() {}
136
137	default: // Run integration tests against a real backend.
138		ts := testutil.TokenSource(ctx, Scope)
139		if ts == nil {
140			log.Println("Integration tests skipped. See CONTRIBUTING.md for details")
141			return func() {}
142		}
143		bqOpts := append(grpcHeadersChecker.CallOptions(), option.WithTokenSource(ts))
144		sOpts := append(grpcHeadersChecker.CallOptions(), option.WithTokenSource(testutil.TokenSource(ctx, storage.ScopeFullControl)))
145		cleanup := func() {}
146		now := time.Now().UTC()
147		if *record {
148			if !httpreplay.Supported() {
149				log.Print("record not supported for Go versions before 1.8")
150			} else {
151				nowBytes, err := json.Marshal(now)
152				if err != nil {
153					log.Fatal(err)
154				}
155				recorder, err := httpreplay.NewRecorder(replayFilename, nowBytes)
156				if err != nil {
157					log.Fatalf("could not record: %v", err)
158				}
159				log.Printf("recording to %s", replayFilename)
160				hc, err := recorder.Client(ctx, bqOpts...)
161				if err != nil {
162					log.Fatal(err)
163				}
164				bqOpts = append(grpcHeadersChecker.CallOptions(), option.WithHTTPClient(hc))
165				hc, err = recorder.Client(ctx, sOpts...)
166				if err != nil {
167					log.Fatal(err)
168				}
169				sOpts = append(grpcHeadersChecker.CallOptions(), option.WithHTTPClient(hc))
170				cleanup = func() {
171					if err := recorder.Close(); err != nil {
172						log.Printf("saving recording: %v", err)
173					}
174				}
175			}
176		}
177		var err error
178		client, err = NewClient(ctx, projID, bqOpts...)
179		if err != nil {
180			log.Fatalf("NewClient: %v", err)
181		}
182		storageClient, err = storage.NewClient(ctx, sOpts...)
183		if err != nil {
184			log.Fatalf("storage.NewClient: %v", err)
185		}
186		c := initTestState(client, now)
187		return func() { c(); cleanup() }
188	}
189}
190
191func initTestState(client *Client, t time.Time) func() {
192	// BigQuery does not accept hyphens in dataset or table IDs, so we create IDs
193	// with underscores.
194	ctx := context.Background()
195	opts := &uid.Options{Sep: '_', Time: t}
196	datasetIDs = uid.NewSpace("dataset", opts)
197	tableIDs = uid.NewSpace("table", opts)
198	modelIDs = uid.NewSpace("model", opts)
199	routineIDs = uid.NewSpace("routine", opts)
200	testTableExpiration = t.Add(10 * time.Minute).Round(time.Second)
201	// For replayability, seed the random source with t.
202	Seed(t.UnixNano())
203
204	dataset = client.Dataset(datasetIDs.New())
205	if err := dataset.Create(ctx, nil); err != nil {
206		log.Fatalf("creating dataset %s: %v", dataset.DatasetID, err)
207	}
208	return func() {
209		if err := dataset.DeleteWithContents(ctx); err != nil {
210			log.Printf("could not delete %s", dataset.DatasetID)
211		}
212	}
213}
214
215func TestIntegration_TableCreate(t *testing.T) {
216	// Check that creating a record field with an empty schema is an error.
217	if client == nil {
218		t.Skip("Integration tests skipped")
219	}
220	table := dataset.Table("t_bad")
221	schema := Schema{
222		{Name: "rec", Type: RecordFieldType, Schema: Schema{}},
223	}
224	err := table.Create(context.Background(), &TableMetadata{
225		Schema:         schema,
226		ExpirationTime: testTableExpiration.Add(5 * time.Minute),
227	})
228	if err == nil {
229		t.Fatal("want error, got nil")
230	}
231	if !hasStatusCode(err, http.StatusBadRequest) {
232		t.Fatalf("want a 400 error, got %v", err)
233	}
234}
235
236func TestIntegration_TableCreateView(t *testing.T) {
237	if client == nil {
238		t.Skip("Integration tests skipped")
239	}
240	ctx := context.Background()
241	table := newTable(t, schema)
242	defer table.Delete(ctx)
243
244	// Test that standard SQL views work.
245	view := dataset.Table("t_view_standardsql")
246	query := fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM `%s.%s.%s`",
247		dataset.ProjectID, dataset.DatasetID, table.TableID)
248	err := view.Create(context.Background(), &TableMetadata{
249		ViewQuery:      query,
250		UseStandardSQL: true,
251	})
252	if err != nil {
253		t.Fatalf("table.create: Did not expect an error, got: %v", err)
254	}
255	if err := view.Delete(ctx); err != nil {
256		t.Fatal(err)
257	}
258}
259
260func TestIntegration_TableMetadata(t *testing.T) {
261
262	if client == nil {
263		t.Skip("Integration tests skipped")
264	}
265	ctx := context.Background()
266	table := newTable(t, schema)
267	defer table.Delete(ctx)
268	// Check table metadata.
269	md, err := table.Metadata(ctx)
270	if err != nil {
271		t.Fatal(err)
272	}
273	// TODO(jba): check md more thorougly.
274	if got, want := md.FullID, fmt.Sprintf("%s:%s.%s", dataset.ProjectID, dataset.DatasetID, table.TableID); got != want {
275		t.Errorf("metadata.FullID: got %q, want %q", got, want)
276	}
277	if got, want := md.Type, RegularTable; got != want {
278		t.Errorf("metadata.Type: got %v, want %v", got, want)
279	}
280	if got, want := md.ExpirationTime, testTableExpiration; !got.Equal(want) {
281		t.Errorf("metadata.Type: got %v, want %v", got, want)
282	}
283
284	// Check that timePartitioning is nil by default
285	if md.TimePartitioning != nil {
286		t.Errorf("metadata.TimePartitioning: got %v, want %v", md.TimePartitioning, nil)
287	}
288
289	// Create tables that have time partitioning
290	partitionCases := []struct {
291		timePartitioning TimePartitioning
292		wantExpiration   time.Duration
293		wantField        string
294		wantPruneFilter  bool
295	}{
296		{TimePartitioning{}, time.Duration(0), "", false},
297		{TimePartitioning{Expiration: time.Second}, time.Second, "", false},
298		{TimePartitioning{RequirePartitionFilter: true}, time.Duration(0), "", true},
299		{
300			TimePartitioning{
301				Expiration:             time.Second,
302				Field:                  "date",
303				RequirePartitionFilter: true,
304			}, time.Second, "date", true},
305	}
306
307	schema2 := Schema{
308		{Name: "name", Type: StringFieldType},
309		{Name: "date", Type: DateFieldType},
310	}
311
312	clustering := &Clustering{
313		Fields: []string{"name"},
314	}
315
316	// Currently, clustering depends on partitioning.  Interleave testing of the two features.
317	for i, c := range partitionCases {
318		table := dataset.Table(fmt.Sprintf("t_metadata_partition_nocluster_%v", i))
319		clusterTable := dataset.Table(fmt.Sprintf("t_metadata_partition_cluster_%v", i))
320
321		// Create unclustered, partitioned variant and get metadata.
322		err = table.Create(context.Background(), &TableMetadata{
323			Schema:           schema2,
324			TimePartitioning: &c.timePartitioning,
325			ExpirationTime:   testTableExpiration,
326		})
327		if err != nil {
328			t.Fatal(err)
329		}
330		defer table.Delete(ctx)
331		md, err := table.Metadata(ctx)
332		if err != nil {
333			t.Fatal(err)
334		}
335
336		// Created clustered table and get metadata.
337		err = clusterTable.Create(context.Background(), &TableMetadata{
338			Schema:           schema2,
339			TimePartitioning: &c.timePartitioning,
340			ExpirationTime:   testTableExpiration,
341			Clustering:       clustering,
342		})
343		if err != nil {
344			t.Fatal(err)
345		}
346		clusterMD, err := clusterTable.Metadata(ctx)
347		if err != nil {
348			t.Fatal(err)
349		}
350
351		for _, v := range []*TableMetadata{md, clusterMD} {
352			got := v.TimePartitioning
353			want := &TimePartitioning{
354				Expiration:             c.wantExpiration,
355				Field:                  c.wantField,
356				RequirePartitionFilter: c.wantPruneFilter,
357			}
358			if !testutil.Equal(got, want) {
359				t.Errorf("metadata.TimePartitioning: got %v, want %v", got, want)
360			}
361			// Manipulate RequirePartitionFilter at the table level.
362			mdUpdate := TableMetadataToUpdate{
363				RequirePartitionFilter: !want.RequirePartitionFilter,
364			}
365
366			newmd, err := table.Update(ctx, mdUpdate, "")
367			if err != nil {
368				t.Errorf("failed to invert RequirePartitionFilter on %s: %v", table.FullyQualifiedName(), err)
369			}
370			if newmd.RequirePartitionFilter == want.RequirePartitionFilter {
371				t.Errorf("inverting table-level RequirePartitionFilter on %s failed, want %t got %t", table.FullyQualifiedName(), !want.RequirePartitionFilter, newmd.RequirePartitionFilter)
372			}
373			// Also verify that the clone of RequirePartitionFilter in the TimePartitioning message is consistent.
374			if newmd.RequirePartitionFilter != newmd.TimePartitioning.RequirePartitionFilter {
375				t.Errorf("inconsistent RequirePartitionFilter.  Table: %t, TimePartitioning: %t", newmd.RequirePartitionFilter, newmd.TimePartitioning.RequirePartitionFilter)
376			}
377
378		}
379
380		if md.Clustering != nil {
381			t.Errorf("metadata.Clustering was not nil on unclustered table %s", table.TableID)
382		}
383		got := clusterMD.Clustering
384		want := clustering
385		if clusterMD.Clustering != clustering {
386			if !testutil.Equal(got, want) {
387				t.Errorf("metadata.Clustering: got %v, want %v", got, want)
388			}
389		}
390	}
391
392}
393
394func TestIntegration_RemoveTimePartitioning(t *testing.T) {
395	if client == nil {
396		t.Skip("Integration tests skipped")
397	}
398	ctx := context.Background()
399	table := dataset.Table(tableIDs.New())
400	want := 24 * time.Hour
401	err := table.Create(ctx, &TableMetadata{
402		ExpirationTime: testTableExpiration,
403		TimePartitioning: &TimePartitioning{
404			Expiration: want,
405		},
406	})
407	if err != nil {
408		t.Fatal(err)
409	}
410	defer table.Delete(ctx)
411
412	md, err := table.Metadata(ctx)
413	if err != nil {
414		t.Fatal(err)
415	}
416	if got := md.TimePartitioning.Expiration; got != want {
417		t.Fatalf("TimeParitioning expiration want = %v, got = %v", want, got)
418	}
419
420	// Remove time partitioning expiration
421	md, err = table.Update(context.Background(), TableMetadataToUpdate{
422		TimePartitioning: &TimePartitioning{Expiration: 0},
423	}, md.ETag)
424	if err != nil {
425		t.Fatal(err)
426	}
427
428	want = time.Duration(0)
429	if got := md.TimePartitioning.Expiration; got != want {
430		t.Fatalf("TimeParitioning expiration want = %v, got = %v", want, got)
431	}
432}
433
434func TestIntegration_DatasetCreate(t *testing.T) {
435	if client == nil {
436		t.Skip("Integration tests skipped")
437	}
438	ctx := context.Background()
439	ds := client.Dataset(datasetIDs.New())
440	wmd := &DatasetMetadata{Name: "name", Location: "EU"}
441	err := ds.Create(ctx, wmd)
442	if err != nil {
443		t.Fatal(err)
444	}
445	gmd, err := ds.Metadata(ctx)
446	if err != nil {
447		t.Fatal(err)
448	}
449	if got, want := gmd.Name, wmd.Name; got != want {
450		t.Errorf("name: got %q, want %q", got, want)
451	}
452	if got, want := gmd.Location, wmd.Location; got != want {
453		t.Errorf("location: got %q, want %q", got, want)
454	}
455	if err := ds.Delete(ctx); err != nil {
456		t.Fatalf("deleting dataset %v: %v", ds, err)
457	}
458}
459
460func TestIntegration_DatasetMetadata(t *testing.T) {
461	if client == nil {
462		t.Skip("Integration tests skipped")
463	}
464	ctx := context.Background()
465	md, err := dataset.Metadata(ctx)
466	if err != nil {
467		t.Fatal(err)
468	}
469	if got, want := md.FullID, fmt.Sprintf("%s:%s", dataset.ProjectID, dataset.DatasetID); got != want {
470		t.Errorf("FullID: got %q, want %q", got, want)
471	}
472	jan2016 := time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC)
473	if md.CreationTime.Before(jan2016) {
474		t.Errorf("CreationTime: got %s, want > 2016-1-1", md.CreationTime)
475	}
476	if md.LastModifiedTime.Before(jan2016) {
477		t.Errorf("LastModifiedTime: got %s, want > 2016-1-1", md.LastModifiedTime)
478	}
479
480	// Verify that we get a NotFound for a nonexistent dataset.
481	_, err = client.Dataset("does_not_exist").Metadata(ctx)
482	if err == nil || !hasStatusCode(err, http.StatusNotFound) {
483		t.Errorf("got %v, want NotFound error", err)
484	}
485}
486
487func TestIntegration_DatasetDelete(t *testing.T) {
488	if client == nil {
489		t.Skip("Integration tests skipped")
490	}
491	ctx := context.Background()
492	ds := client.Dataset(datasetIDs.New())
493	if err := ds.Create(ctx, nil); err != nil {
494		t.Fatalf("creating dataset %s: %v", ds.DatasetID, err)
495	}
496	if err := ds.Delete(ctx); err != nil {
497		t.Fatalf("deleting dataset %s: %v", ds.DatasetID, err)
498	}
499}
500
501func TestIntegration_DatasetDeleteWithContents(t *testing.T) {
502	if client == nil {
503		t.Skip("Integration tests skipped")
504	}
505	ctx := context.Background()
506	ds := client.Dataset(datasetIDs.New())
507	if err := ds.Create(ctx, nil); err != nil {
508		t.Fatalf("creating dataset %s: %v", ds.DatasetID, err)
509	}
510	table := ds.Table(tableIDs.New())
511	if err := table.Create(ctx, nil); err != nil {
512		t.Fatalf("creating table %s in dataset %s: %v", table.TableID, table.DatasetID, err)
513	}
514	// We expect failure here
515	if err := ds.Delete(ctx); err == nil {
516		t.Fatalf("non-recursive delete of dataset %s succeeded unexpectedly.", ds.DatasetID)
517	}
518	if err := ds.DeleteWithContents(ctx); err != nil {
519		t.Fatalf("deleting recursively dataset %s: %v", ds.DatasetID, err)
520	}
521}
522
523func TestIntegration_DatasetUpdateETags(t *testing.T) {
524	if client == nil {
525		t.Skip("Integration tests skipped")
526	}
527
528	check := func(md *DatasetMetadata, wantDesc, wantName string) {
529		if md.Description != wantDesc {
530			t.Errorf("description: got %q, want %q", md.Description, wantDesc)
531		}
532		if md.Name != wantName {
533			t.Errorf("name: got %q, want %q", md.Name, wantName)
534		}
535	}
536
537	ctx := context.Background()
538	md, err := dataset.Metadata(ctx)
539	if err != nil {
540		t.Fatal(err)
541	}
542	if md.ETag == "" {
543		t.Fatal("empty ETag")
544	}
545	// Write without ETag succeeds.
546	desc := md.Description + "d2"
547	name := md.Name + "n2"
548	md2, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: desc, Name: name}, "")
549	if err != nil {
550		t.Fatal(err)
551	}
552	check(md2, desc, name)
553
554	// Write with original ETag fails because of intervening write.
555	_, err = dataset.Update(ctx, DatasetMetadataToUpdate{Description: "d", Name: "n"}, md.ETag)
556	if err == nil {
557		t.Fatal("got nil, want error")
558	}
559
560	// Write with most recent ETag succeeds.
561	md3, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: "", Name: ""}, md2.ETag)
562	if err != nil {
563		t.Fatal(err)
564	}
565	check(md3, "", "")
566}
567
568func TestIntegration_DatasetUpdateDefaultExpiration(t *testing.T) {
569	if client == nil {
570		t.Skip("Integration tests skipped")
571	}
572	ctx := context.Background()
573	_, err := dataset.Metadata(ctx)
574	if err != nil {
575		t.Fatal(err)
576	}
577	// Set the default expiration time.
578	md, err := dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Hour}, "")
579	if err != nil {
580		t.Fatal(err)
581	}
582	if md.DefaultTableExpiration != time.Hour {
583		t.Fatalf("got %s, want 1h", md.DefaultTableExpiration)
584	}
585	// Omitting DefaultTableExpiration doesn't change it.
586	md, err = dataset.Update(ctx, DatasetMetadataToUpdate{Name: "xyz"}, "")
587	if err != nil {
588		t.Fatal(err)
589	}
590	if md.DefaultTableExpiration != time.Hour {
591		t.Fatalf("got %s, want 1h", md.DefaultTableExpiration)
592	}
593	// Setting it to 0 deletes it (which looks like a 0 duration).
594	md, err = dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Duration(0)}, "")
595	if err != nil {
596		t.Fatal(err)
597	}
598	if md.DefaultTableExpiration != 0 {
599		t.Fatalf("got %s, want 0", md.DefaultTableExpiration)
600	}
601}
602
603func TestIntegration_DatasetUpdateAccess(t *testing.T) {
604	if client == nil {
605		t.Skip("Integration tests skipped")
606	}
607	ctx := context.Background()
608	md, err := dataset.Metadata(ctx)
609	if err != nil {
610		t.Fatal(err)
611	}
612	origAccess := append([]*AccessEntry(nil), md.Access...)
613	newEntry := &AccessEntry{
614		Role:       ReaderRole,
615		Entity:     "Joe@example.com",
616		EntityType: UserEmailEntity,
617	}
618	newAccess := append(md.Access, newEntry)
619	dm := DatasetMetadataToUpdate{Access: newAccess}
620	md, err = dataset.Update(ctx, dm, md.ETag)
621	if err != nil {
622		t.Fatal(err)
623	}
624	defer func() {
625		_, err := dataset.Update(ctx, DatasetMetadataToUpdate{Access: origAccess}, md.ETag)
626		if err != nil {
627			t.Log("could not restore dataset access list")
628		}
629	}()
630	if diff := testutil.Diff(md.Access, newAccess); diff != "" {
631		t.Fatalf("got=-, want=+:\n%s", diff)
632	}
633}
634
635func TestIntegration_DatasetUpdateLabels(t *testing.T) {
636	if client == nil {
637		t.Skip("Integration tests skipped")
638	}
639	ctx := context.Background()
640	_, err := dataset.Metadata(ctx)
641	if err != nil {
642		t.Fatal(err)
643	}
644	var dm DatasetMetadataToUpdate
645	dm.SetLabel("label", "value")
646	md, err := dataset.Update(ctx, dm, "")
647	if err != nil {
648		t.Fatal(err)
649	}
650	if got, want := md.Labels["label"], "value"; got != want {
651		t.Errorf("got %q, want %q", got, want)
652	}
653	dm = DatasetMetadataToUpdate{}
654	dm.DeleteLabel("label")
655	md, err = dataset.Update(ctx, dm, "")
656	if err != nil {
657		t.Fatal(err)
658	}
659	if _, ok := md.Labels["label"]; ok {
660		t.Error("label still present after deletion")
661	}
662}
663
664func TestIntegration_TableUpdateLabels(t *testing.T) {
665	if client == nil {
666		t.Skip("Integration tests skipped")
667	}
668	ctx := context.Background()
669	table := newTable(t, schema)
670	defer table.Delete(ctx)
671
672	var tm TableMetadataToUpdate
673	tm.SetLabel("label", "value")
674	md, err := table.Update(ctx, tm, "")
675	if err != nil {
676		t.Fatal(err)
677	}
678	if got, want := md.Labels["label"], "value"; got != want {
679		t.Errorf("got %q, want %q", got, want)
680	}
681	tm = TableMetadataToUpdate{}
682	tm.DeleteLabel("label")
683	md, err = table.Update(ctx, tm, "")
684	if err != nil {
685		t.Fatal(err)
686	}
687	if _, ok := md.Labels["label"]; ok {
688		t.Error("label still present after deletion")
689	}
690}
691
692func TestIntegration_Tables(t *testing.T) {
693	if client == nil {
694		t.Skip("Integration tests skipped")
695	}
696	ctx := context.Background()
697	table := newTable(t, schema)
698	defer table.Delete(ctx)
699	wantName := table.FullyQualifiedName()
700
701	// This test is flaky due to eventual consistency.
702	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
703	defer cancel()
704	err := internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
705		// Iterate over tables in the dataset.
706		it := dataset.Tables(ctx)
707		var tableNames []string
708		for {
709			tbl, err := it.Next()
710			if err == iterator.Done {
711				break
712			}
713			if err != nil {
714				return false, err
715			}
716			tableNames = append(tableNames, tbl.FullyQualifiedName())
717		}
718		// Other tests may be running with this dataset, so there might be more
719		// than just our table in the list. So don't try for an exact match; just
720		// make sure that our table is there somewhere.
721		for _, tn := range tableNames {
722			if tn == wantName {
723				return true, nil
724			}
725		}
726		return false, fmt.Errorf("got %v\nwant %s in the list", tableNames, wantName)
727	})
728	if err != nil {
729		t.Fatal(err)
730	}
731}
732
733func TestIntegration_InsertAndRead(t *testing.T) {
734	if client == nil {
735		t.Skip("Integration tests skipped")
736	}
737	ctx := context.Background()
738	table := newTable(t, schema)
739	defer table.Delete(ctx)
740
741	// Populate the table.
742	ins := table.Inserter()
743	var (
744		wantRows  [][]Value
745		saverRows []*ValuesSaver
746	)
747	for i, name := range []string{"a", "b", "c"} {
748		row := []Value{name, []Value{int64(i)}, []Value{true}}
749		wantRows = append(wantRows, row)
750		saverRows = append(saverRows, &ValuesSaver{
751			Schema:   schema,
752			InsertID: name,
753			Row:      row,
754		})
755	}
756	if err := ins.Put(ctx, saverRows); err != nil {
757		t.Fatal(putError(err))
758	}
759
760	// Wait until the data has been uploaded. This can take a few seconds, according
761	// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
762	if err := waitForRow(ctx, table); err != nil {
763		t.Fatal(err)
764	}
765	// Read the table.
766	checkRead(t, "upload", table.Read(ctx), wantRows)
767
768	// Query the table.
769	q := client.Query(fmt.Sprintf("select name, nums, rec from %s", table.TableID))
770	q.DefaultProjectID = dataset.ProjectID
771	q.DefaultDatasetID = dataset.DatasetID
772
773	rit, err := q.Read(ctx)
774	if err != nil {
775		t.Fatal(err)
776	}
777	checkRead(t, "query", rit, wantRows)
778
779	// Query the long way.
780	job1, err := q.Run(ctx)
781	if err != nil {
782		t.Fatal(err)
783	}
784	if job1.LastStatus() == nil {
785		t.Error("no LastStatus")
786	}
787	job2, err := client.JobFromID(ctx, job1.ID())
788	if err != nil {
789		t.Fatal(err)
790	}
791	if job2.LastStatus() == nil {
792		t.Error("no LastStatus")
793	}
794	rit, err = job2.Read(ctx)
795	if err != nil {
796		t.Fatal(err)
797	}
798	checkRead(t, "job.Read", rit, wantRows)
799
800	// Get statistics.
801	jobStatus, err := job2.Status(ctx)
802	if err != nil {
803		t.Fatal(err)
804	}
805	if jobStatus.Statistics == nil {
806		t.Fatal("jobStatus missing statistics")
807	}
808	if _, ok := jobStatus.Statistics.Details.(*QueryStatistics); !ok {
809		t.Errorf("expected QueryStatistics, got %T", jobStatus.Statistics.Details)
810	}
811
812	// Test reading directly into a []Value.
813	valueLists, schema, _, err := readAll(table.Read(ctx))
814	if err != nil {
815		t.Fatal(err)
816	}
817	it := table.Read(ctx)
818	for i, vl := range valueLists {
819		var got []Value
820		if err := it.Next(&got); err != nil {
821			t.Fatal(err)
822		}
823		if !testutil.Equal(it.Schema, schema) {
824			t.Fatalf("got schema %v, want %v", it.Schema, schema)
825		}
826		want := []Value(vl)
827		if !testutil.Equal(got, want) {
828			t.Errorf("%d: got %v, want %v", i, got, want)
829		}
830	}
831
832	// Test reading into a map.
833	it = table.Read(ctx)
834	for _, vl := range valueLists {
835		var vm map[string]Value
836		if err := it.Next(&vm); err != nil {
837			t.Fatal(err)
838		}
839		if got, want := len(vm), len(vl); got != want {
840			t.Fatalf("valueMap len: got %d, want %d", got, want)
841		}
842		// With maps, structs become nested maps.
843		vl[2] = map[string]Value{"bool": vl[2].([]Value)[0]}
844		for i, v := range vl {
845			if got, want := vm[schema[i].Name], v; !testutil.Equal(got, want) {
846				t.Errorf("%d, name=%s: got %#v, want %#v",
847					i, schema[i].Name, got, want)
848			}
849		}
850	}
851
852}
853
854type SubSubTestStruct struct {
855	Integer int64
856}
857
858type SubTestStruct struct {
859	String      string
860	Record      SubSubTestStruct
861	RecordArray []SubSubTestStruct
862}
863
864type TestStruct struct {
865	Name      string
866	Bytes     []byte
867	Integer   int64
868	Float     float64
869	Boolean   bool
870	Timestamp time.Time
871	Date      civil.Date
872	Time      civil.Time
873	DateTime  civil.DateTime
874	Numeric   *big.Rat
875	Geography string
876
877	StringArray    []string
878	IntegerArray   []int64
879	FloatArray     []float64
880	BooleanArray   []bool
881	TimestampArray []time.Time
882	DateArray      []civil.Date
883	TimeArray      []civil.Time
884	DateTimeArray  []civil.DateTime
885	NumericArray   []*big.Rat
886	GeographyArray []string
887
888	Record      SubTestStruct
889	RecordArray []SubTestStruct
890}
891
892// Round times to the microsecond for comparison purposes.
893var roundToMicros = cmp.Transformer("RoundToMicros",
894	func(t time.Time) time.Time { return t.Round(time.Microsecond) })
895
896func TestIntegration_InsertAndReadStructs(t *testing.T) {
897	if client == nil {
898		t.Skip("Integration tests skipped")
899	}
900	schema, err := InferSchema(TestStruct{})
901	if err != nil {
902		t.Fatal(err)
903	}
904
905	ctx := context.Background()
906	table := newTable(t, schema)
907	defer table.Delete(ctx)
908
909	d := civil.Date{Year: 2016, Month: 3, Day: 20}
910	tm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
911	ts := time.Date(2016, 3, 20, 15, 4, 5, 6000, time.UTC)
912	dtm := civil.DateTime{Date: d, Time: tm}
913	d2 := civil.Date{Year: 1994, Month: 5, Day: 15}
914	tm2 := civil.Time{Hour: 1, Minute: 2, Second: 4, Nanosecond: 0}
915	ts2 := time.Date(1994, 5, 15, 1, 2, 4, 0, time.UTC)
916	dtm2 := civil.DateTime{Date: d2, Time: tm2}
917	g := "POINT(-122.350220 47.649154)"
918	g2 := "POINT(-122.0836791 37.421827)"
919
920	// Populate the table.
921	ins := table.Inserter()
922	want := []*TestStruct{
923		{
924			"a",
925			[]byte("byte"),
926			42,
927			3.14,
928			true,
929			ts,
930			d,
931			tm,
932			dtm,
933			big.NewRat(57, 100),
934			g,
935			[]string{"a", "b"},
936			[]int64{1, 2},
937			[]float64{1, 1.41},
938			[]bool{true, false},
939			[]time.Time{ts, ts2},
940			[]civil.Date{d, d2},
941			[]civil.Time{tm, tm2},
942			[]civil.DateTime{dtm, dtm2},
943			[]*big.Rat{big.NewRat(1, 2), big.NewRat(3, 5)},
944			[]string{g, g2},
945			SubTestStruct{
946				"string",
947				SubSubTestStruct{24},
948				[]SubSubTestStruct{{1}, {2}},
949			},
950			[]SubTestStruct{
951				{String: "empty"},
952				{
953					"full",
954					SubSubTestStruct{1},
955					[]SubSubTestStruct{{1}, {2}},
956				},
957			},
958		},
959		{
960			Name:      "b",
961			Bytes:     []byte("byte2"),
962			Integer:   24,
963			Float:     4.13,
964			Boolean:   false,
965			Timestamp: ts,
966			Date:      d,
967			Time:      tm,
968			DateTime:  dtm,
969			Numeric:   big.NewRat(4499, 10000),
970		},
971	}
972	var savers []*StructSaver
973	for _, s := range want {
974		savers = append(savers, &StructSaver{Schema: schema, Struct: s})
975	}
976	if err := ins.Put(ctx, savers); err != nil {
977		t.Fatal(putError(err))
978	}
979
980	// Wait until the data has been uploaded. This can take a few seconds, according
981	// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
982	if err := waitForRow(ctx, table); err != nil {
983		t.Fatal(err)
984	}
985
986	// Test iteration with structs.
987	it := table.Read(ctx)
988	var got []*TestStruct
989	for {
990		var g TestStruct
991		err := it.Next(&g)
992		if err == iterator.Done {
993			break
994		}
995		if err != nil {
996			t.Fatal(err)
997		}
998		got = append(got, &g)
999	}
1000	sort.Sort(byName(got))
1001
1002	// BigQuery does not elide nils. It reports an error for nil fields.
1003	for i, g := range got {
1004		if i >= len(want) {
1005			t.Errorf("%d: got %v, past end of want", i, pretty.Value(g))
1006		} else if diff := testutil.Diff(g, want[i], roundToMicros); diff != "" {
1007			t.Errorf("%d: got=-, want=+:\n%s", i, diff)
1008		}
1009	}
1010}
1011
1012type byName []*TestStruct
1013
1014func (b byName) Len() int           { return len(b) }
1015func (b byName) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
1016func (b byName) Less(i, j int) bool { return b[i].Name < b[j].Name }
1017
1018func TestIntegration_InsertAndReadNullable(t *testing.T) {
1019	if client == nil {
1020		t.Skip("Integration tests skipped")
1021	}
1022	ctm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
1023	cdt := civil.DateTime{Date: testDate, Time: ctm}
1024	rat := big.NewRat(33, 100)
1025	geo := "POINT(-122.198939 47.669865)"
1026
1027	// Nil fields in the struct.
1028	testInsertAndReadNullable(t, testStructNullable{}, make([]Value, len(testStructNullableSchema)))
1029
1030	// Explicitly invalidate the Null* types within the struct.
1031	testInsertAndReadNullable(t, testStructNullable{
1032		String:    NullString{Valid: false},
1033		Integer:   NullInt64{Valid: false},
1034		Float:     NullFloat64{Valid: false},
1035		Boolean:   NullBool{Valid: false},
1036		Timestamp: NullTimestamp{Valid: false},
1037		Date:      NullDate{Valid: false},
1038		Time:      NullTime{Valid: false},
1039		DateTime:  NullDateTime{Valid: false},
1040		Geography: NullGeography{Valid: false},
1041	},
1042		make([]Value, len(testStructNullableSchema)))
1043
1044	// Populate the struct with values.
1045	testInsertAndReadNullable(t, testStructNullable{
1046		String:    NullString{"x", true},
1047		Bytes:     []byte{1, 2, 3},
1048		Integer:   NullInt64{1, true},
1049		Float:     NullFloat64{2.3, true},
1050		Boolean:   NullBool{true, true},
1051		Timestamp: NullTimestamp{testTimestamp, true},
1052		Date:      NullDate{testDate, true},
1053		Time:      NullTime{ctm, true},
1054		DateTime:  NullDateTime{cdt, true},
1055		Numeric:   rat,
1056		Geography: NullGeography{geo, true},
1057		Record:    &subNullable{X: NullInt64{4, true}},
1058	},
1059		[]Value{"x", []byte{1, 2, 3}, int64(1), 2.3, true, testTimestamp, testDate, ctm, cdt, rat, geo, []Value{int64(4)}})
1060}
1061
1062func testInsertAndReadNullable(t *testing.T, ts testStructNullable, wantRow []Value) {
1063	ctx := context.Background()
1064	table := newTable(t, testStructNullableSchema)
1065	defer table.Delete(ctx)
1066
1067	// Populate the table.
1068	ins := table.Inserter()
1069	if err := ins.Put(ctx, []*StructSaver{{Schema: testStructNullableSchema, Struct: ts}}); err != nil {
1070		t.Fatal(putError(err))
1071	}
1072	// Wait until the data has been uploaded. This can take a few seconds, according
1073	// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
1074	if err := waitForRow(ctx, table); err != nil {
1075		t.Fatal(err)
1076	}
1077
1078	// Read into a []Value.
1079	iter := table.Read(ctx)
1080	gotRows, _, _, err := readAll(iter)
1081	if err != nil {
1082		t.Fatal(err)
1083	}
1084	if len(gotRows) != 1 {
1085		t.Fatalf("got %d rows, want 1", len(gotRows))
1086	}
1087	if diff := testutil.Diff(gotRows[0], wantRow, roundToMicros); diff != "" {
1088		t.Error(diff)
1089	}
1090
1091	// Read into a struct.
1092	want := ts
1093	var sn testStructNullable
1094	it := table.Read(ctx)
1095	if err := it.Next(&sn); err != nil {
1096		t.Fatal(err)
1097	}
1098	if diff := testutil.Diff(sn, want, roundToMicros); diff != "" {
1099		t.Error(diff)
1100	}
1101}
1102
1103func TestIntegration_TableUpdate(t *testing.T) {
1104	if client == nil {
1105		t.Skip("Integration tests skipped")
1106	}
1107	ctx := context.Background()
1108	table := newTable(t, schema)
1109	defer table.Delete(ctx)
1110
1111	// Test Update of non-schema fields.
1112	tm, err := table.Metadata(ctx)
1113	if err != nil {
1114		t.Fatal(err)
1115	}
1116	wantDescription := tm.Description + "more"
1117	wantName := tm.Name + "more"
1118	wantExpiration := tm.ExpirationTime.Add(time.Hour * 24)
1119	got, err := table.Update(ctx, TableMetadataToUpdate{
1120		Description:    wantDescription,
1121		Name:           wantName,
1122		ExpirationTime: wantExpiration,
1123	}, tm.ETag)
1124	if err != nil {
1125		t.Fatal(err)
1126	}
1127	if got.Description != wantDescription {
1128		t.Errorf("Description: got %q, want %q", got.Description, wantDescription)
1129	}
1130	if got.Name != wantName {
1131		t.Errorf("Name: got %q, want %q", got.Name, wantName)
1132	}
1133	if got.ExpirationTime != wantExpiration {
1134		t.Errorf("ExpirationTime: got %q, want %q", got.ExpirationTime, wantExpiration)
1135	}
1136	if !testutil.Equal(got.Schema, schema) {
1137		t.Errorf("Schema: got %v, want %v", pretty.Value(got.Schema), pretty.Value(schema))
1138	}
1139
1140	// Blind write succeeds.
1141	_, err = table.Update(ctx, TableMetadataToUpdate{Name: "x"}, "")
1142	if err != nil {
1143		t.Fatal(err)
1144	}
1145	// Write with old etag fails.
1146	_, err = table.Update(ctx, TableMetadataToUpdate{Name: "y"}, got.ETag)
1147	if err == nil {
1148		t.Fatal("Update with old ETag succeeded, wanted failure")
1149	}
1150
1151	// Test schema update.
1152	// Columns can be added. schema2 is the same as schema, except for the
1153	// added column in the middle.
1154	nested := Schema{
1155		{Name: "nested", Type: BooleanFieldType},
1156		{Name: "other", Type: StringFieldType},
1157	}
1158	schema2 := Schema{
1159		schema[0],
1160		{Name: "rec2", Type: RecordFieldType, Schema: nested},
1161		schema[1],
1162		schema[2],
1163	}
1164
1165	got, err = table.Update(ctx, TableMetadataToUpdate{Schema: schema2}, "")
1166	if err != nil {
1167		t.Fatal(err)
1168	}
1169
1170	// Wherever you add the column, it appears at the end.
1171	schema3 := Schema{schema2[0], schema2[2], schema2[3], schema2[1]}
1172	if !testutil.Equal(got.Schema, schema3) {
1173		t.Errorf("add field:\ngot  %v\nwant %v",
1174			pretty.Value(got.Schema), pretty.Value(schema3))
1175	}
1176
1177	// Updating with the empty schema succeeds, but is a no-op.
1178	got, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema{}}, "")
1179	if err != nil {
1180		t.Fatal(err)
1181	}
1182	if !testutil.Equal(got.Schema, schema3) {
1183		t.Errorf("empty schema:\ngot  %v\nwant %v",
1184			pretty.Value(got.Schema), pretty.Value(schema3))
1185	}
1186
1187	// Error cases when updating schema.
1188	for _, test := range []struct {
1189		desc   string
1190		fields Schema
1191	}{
1192		{"change from optional to required", Schema{
1193			{Name: "name", Type: StringFieldType, Required: true},
1194			schema3[1],
1195			schema3[2],
1196			schema3[3],
1197		}},
1198		{"add a required field", Schema{
1199			schema3[0], schema3[1], schema3[2], schema3[3],
1200			{Name: "req", Type: StringFieldType, Required: true},
1201		}},
1202		{"remove a field", Schema{schema3[0], schema3[1], schema3[2]}},
1203		{"remove a nested field", Schema{
1204			schema3[0], schema3[1], schema3[2],
1205			{Name: "rec2", Type: RecordFieldType, Schema: Schema{nested[0]}}}},
1206		{"remove all nested fields", Schema{
1207			schema3[0], schema3[1], schema3[2],
1208			{Name: "rec2", Type: RecordFieldType, Schema: Schema{}}}},
1209	} {
1210		_, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema(test.fields)}, "")
1211		if err == nil {
1212			t.Errorf("%s: want error, got nil", test.desc)
1213		} else if !hasStatusCode(err, 400) {
1214			t.Errorf("%s: want 400, got %v", test.desc, err)
1215		}
1216	}
1217}
1218
1219func TestIntegration_Load(t *testing.T) {
1220	if client == nil {
1221		t.Skip("Integration tests skipped")
1222	}
1223	ctx := context.Background()
1224	// CSV data can't be loaded into a repeated field, so we use a different schema.
1225	table := newTable(t, Schema{
1226		{Name: "name", Type: StringFieldType},
1227		{Name: "nums", Type: IntegerFieldType},
1228	})
1229	defer table.Delete(ctx)
1230
1231	// Load the table from a reader.
1232	r := strings.NewReader("a,0\nb,1\nc,2\n")
1233	wantRows := [][]Value{
1234		{"a", int64(0)},
1235		{"b", int64(1)},
1236		{"c", int64(2)},
1237	}
1238	rs := NewReaderSource(r)
1239	loader := table.LoaderFrom(rs)
1240	loader.WriteDisposition = WriteTruncate
1241	loader.Labels = map[string]string{"test": "go"}
1242	job, err := loader.Run(ctx)
1243	if err != nil {
1244		t.Fatal(err)
1245	}
1246	if job.LastStatus() == nil {
1247		t.Error("no LastStatus")
1248	}
1249	conf, err := job.Config()
1250	if err != nil {
1251		t.Fatal(err)
1252	}
1253	config, ok := conf.(*LoadConfig)
1254	if !ok {
1255		t.Fatalf("got %T, want LoadConfig", conf)
1256	}
1257	diff := testutil.Diff(config, &loader.LoadConfig,
1258		cmp.AllowUnexported(Table{}),
1259		cmpopts.IgnoreUnexported(Client{}, ReaderSource{}),
1260		// returned schema is at top level, not in the config
1261		cmpopts.IgnoreFields(FileConfig{}, "Schema"))
1262	if diff != "" {
1263		t.Errorf("got=-, want=+:\n%s", diff)
1264	}
1265	if err := wait(ctx, job); err != nil {
1266		t.Fatal(err)
1267	}
1268	checkReadAndTotalRows(t, "reader load", table.Read(ctx), wantRows)
1269
1270}
1271
1272func TestIntegration_DML(t *testing.T) {
1273	if client == nil {
1274		t.Skip("Integration tests skipped")
1275	}
1276	ctx := context.Background()
1277	table := newTable(t, schema)
1278	defer table.Delete(ctx)
1279
1280	sql := fmt.Sprintf(`INSERT %s.%s (name, nums, rec)
1281						VALUES ('a', [0], STRUCT<BOOL>(TRUE)),
1282							   ('b', [1], STRUCT<BOOL>(FALSE)),
1283							   ('c', [2], STRUCT<BOOL>(TRUE))`,
1284		table.DatasetID, table.TableID)
1285	if err := runQueryJob(ctx, sql); err != nil {
1286		t.Fatal(err)
1287	}
1288	wantRows := [][]Value{
1289		{"a", []Value{int64(0)}, []Value{true}},
1290		{"b", []Value{int64(1)}, []Value{false}},
1291		{"c", []Value{int64(2)}, []Value{true}},
1292	}
1293	checkRead(t, "DML", table.Read(ctx), wantRows)
1294}
1295
1296// runQueryJob is useful for running queries where no row data is returned (DDL/DML).
1297func runQueryJob(ctx context.Context, sql string) error {
1298	return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
1299		job, err := client.Query(sql).Run(ctx)
1300		if err != nil {
1301			if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
1302				return true, err // fail on 4xx
1303			}
1304			return false, err
1305		}
1306		_, err = job.Wait(ctx)
1307		if err != nil {
1308			if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
1309				return true, err // fail on 4xx
1310			}
1311			return false, err
1312		}
1313		return true, nil
1314	})
1315}
1316
1317func TestIntegration_TimeTypes(t *testing.T) {
1318	if client == nil {
1319		t.Skip("Integration tests skipped")
1320	}
1321	ctx := context.Background()
1322	dtSchema := Schema{
1323		{Name: "d", Type: DateFieldType},
1324		{Name: "t", Type: TimeFieldType},
1325		{Name: "dt", Type: DateTimeFieldType},
1326		{Name: "ts", Type: TimestampFieldType},
1327	}
1328	table := newTable(t, dtSchema)
1329	defer table.Delete(ctx)
1330
1331	d := civil.Date{Year: 2016, Month: 3, Day: 20}
1332	tm := civil.Time{Hour: 12, Minute: 30, Second: 0, Nanosecond: 6000}
1333	dtm := civil.DateTime{Date: d, Time: tm}
1334	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1335	wantRows := [][]Value{
1336		{d, tm, dtm, ts},
1337	}
1338	ins := table.Inserter()
1339	if err := ins.Put(ctx, []*ValuesSaver{
1340		{Schema: dtSchema, Row: wantRows[0]},
1341	}); err != nil {
1342		t.Fatal(putError(err))
1343	}
1344	if err := waitForRow(ctx, table); err != nil {
1345		t.Fatal(err)
1346	}
1347
1348	// SQL wants DATETIMEs with a space between date and time, but the service
1349	// returns them in RFC3339 form, with a "T" between.
1350	query := fmt.Sprintf("INSERT %s.%s (d, t, dt, ts) "+
1351		"VALUES ('%s', '%s', '%s', '%s')",
1352		table.DatasetID, table.TableID,
1353		d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05"))
1354	if err := runQueryJob(ctx, query); err != nil {
1355		t.Fatal(err)
1356	}
1357	wantRows = append(wantRows, wantRows[0])
1358	checkRead(t, "TimeTypes", table.Read(ctx), wantRows)
1359}
1360
1361func TestIntegration_StandardQuery(t *testing.T) {
1362	if client == nil {
1363		t.Skip("Integration tests skipped")
1364	}
1365	ctx := context.Background()
1366
1367	d := civil.Date{Year: 2016, Month: 3, Day: 20}
1368	tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 0}
1369	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1370	dtm := ts.Format("2006-01-02 15:04:05")
1371
1372	// Constructs Value slices made up of int64s.
1373	ints := func(args ...int) []Value {
1374		vals := make([]Value, len(args))
1375		for i, arg := range args {
1376			vals[i] = int64(arg)
1377		}
1378		return vals
1379	}
1380
1381	testCases := []struct {
1382		query   string
1383		wantRow []Value
1384	}{
1385		{"SELECT 1", ints(1)},
1386		{"SELECT 1.3", []Value{1.3}},
1387		{"SELECT CAST(1.3  AS NUMERIC)", []Value{big.NewRat(13, 10)}},
1388		{"SELECT NUMERIC '0.25'", []Value{big.NewRat(1, 4)}},
1389		{"SELECT TRUE", []Value{true}},
1390		{"SELECT 'ABC'", []Value{"ABC"}},
1391		{"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}},
1392		{fmt.Sprintf("SELECT TIMESTAMP '%s'", dtm), []Value{ts}},
1393		{fmt.Sprintf("SELECT [TIMESTAMP '%s', TIMESTAMP '%s']", dtm, dtm), []Value{[]Value{ts, ts}}},
1394		{fmt.Sprintf("SELECT ('hello', TIMESTAMP '%s')", dtm), []Value{[]Value{"hello", ts}}},
1395		{fmt.Sprintf("SELECT DATETIME(TIMESTAMP '%s')", dtm), []Value{civil.DateTime{Date: d, Time: tm}}},
1396		{fmt.Sprintf("SELECT DATE(TIMESTAMP '%s')", dtm), []Value{d}},
1397		{fmt.Sprintf("SELECT TIME(TIMESTAMP '%s')", dtm), []Value{tm}},
1398		{"SELECT (1, 2)", []Value{ints(1, 2)}},
1399		{"SELECT [1, 2, 3]", []Value{ints(1, 2, 3)}},
1400		{"SELECT ([1, 2], 3, [4, 5])", []Value{[]Value{ints(1, 2), int64(3), ints(4, 5)}}},
1401		{"SELECT [(1, 2, 3), (4, 5, 6)]", []Value{[]Value{ints(1, 2, 3), ints(4, 5, 6)}}},
1402		{"SELECT [([1, 2, 3], 4), ([5, 6], 7)]", []Value{[]Value{[]Value{ints(1, 2, 3), int64(4)}, []Value{ints(5, 6), int64(7)}}}},
1403		{"SELECT ARRAY(SELECT STRUCT([1, 2]))", []Value{[]Value{[]Value{ints(1, 2)}}}},
1404	}
1405	for _, c := range testCases {
1406		q := client.Query(c.query)
1407		it, err := q.Read(ctx)
1408		if err != nil {
1409			t.Fatal(err)
1410		}
1411		checkRead(t, "StandardQuery", it, [][]Value{c.wantRow})
1412	}
1413}
1414
1415func TestIntegration_LegacyQuery(t *testing.T) {
1416	if client == nil {
1417		t.Skip("Integration tests skipped")
1418	}
1419	ctx := context.Background()
1420
1421	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1422	dtm := ts.Format("2006-01-02 15:04:05")
1423
1424	testCases := []struct {
1425		query   string
1426		wantRow []Value
1427	}{
1428		{"SELECT 1", []Value{int64(1)}},
1429		{"SELECT 1.3", []Value{1.3}},
1430		{"SELECT TRUE", []Value{true}},
1431		{"SELECT 'ABC'", []Value{"ABC"}},
1432		{"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}},
1433		{fmt.Sprintf("SELECT TIMESTAMP('%s')", dtm), []Value{ts}},
1434		{fmt.Sprintf("SELECT DATE(TIMESTAMP('%s'))", dtm), []Value{"2016-03-20"}},
1435		{fmt.Sprintf("SELECT TIME(TIMESTAMP('%s'))", dtm), []Value{"15:04:05"}},
1436	}
1437	for _, c := range testCases {
1438		q := client.Query(c.query)
1439		q.UseLegacySQL = true
1440		it, err := q.Read(ctx)
1441		if err != nil {
1442			t.Fatal(err)
1443		}
1444		checkRead(t, "LegacyQuery", it, [][]Value{c.wantRow})
1445	}
1446}
1447
1448func TestIntegration_QueryParameters(t *testing.T) {
1449	if client == nil {
1450		t.Skip("Integration tests skipped")
1451	}
1452	ctx := context.Background()
1453
1454	d := civil.Date{Year: 2016, Month: 3, Day: 20}
1455	tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 3008}
1456	rtm := tm
1457	rtm.Nanosecond = 3000 // round to microseconds
1458	dtm := civil.DateTime{Date: d, Time: tm}
1459	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1460	rat := big.NewRat(13, 10)
1461
1462	type ss struct {
1463		String string
1464	}
1465
1466	type s struct {
1467		Timestamp      time.Time
1468		StringArray    []string
1469		SubStruct      ss
1470		SubStructArray []ss
1471	}
1472
1473	testCases := []struct {
1474		query      string
1475		parameters []QueryParameter
1476		wantRow    []Value
1477		wantConfig interface{}
1478	}{
1479		{
1480			"SELECT @val",
1481			[]QueryParameter{{"val", 1}},
1482			[]Value{int64(1)},
1483			int64(1),
1484		},
1485		{
1486			"SELECT @val",
1487			[]QueryParameter{{"val", 1.3}},
1488			[]Value{1.3},
1489			1.3,
1490		},
1491		{
1492			"SELECT @val",
1493			[]QueryParameter{{"val", rat}},
1494			[]Value{rat},
1495			rat,
1496		},
1497		{
1498			"SELECT @val",
1499			[]QueryParameter{{"val", true}},
1500			[]Value{true},
1501			true,
1502		},
1503		{
1504			"SELECT @val",
1505			[]QueryParameter{{"val", "ABC"}},
1506			[]Value{"ABC"},
1507			"ABC",
1508		},
1509		{
1510			"SELECT @val",
1511			[]QueryParameter{{"val", []byte("foo")}},
1512			[]Value{[]byte("foo")},
1513			[]byte("foo"),
1514		},
1515		{
1516			"SELECT @val",
1517			[]QueryParameter{{"val", ts}},
1518			[]Value{ts},
1519			ts,
1520		},
1521		{
1522			"SELECT @val",
1523			[]QueryParameter{{"val", []time.Time{ts, ts}}},
1524			[]Value{[]Value{ts, ts}},
1525			[]interface{}{ts, ts},
1526		},
1527		{
1528			"SELECT @val",
1529			[]QueryParameter{{"val", dtm}},
1530			[]Value{civil.DateTime{Date: d, Time: rtm}},
1531			civil.DateTime{Date: d, Time: rtm},
1532		},
1533		{
1534			"SELECT @val",
1535			[]QueryParameter{{"val", d}},
1536			[]Value{d},
1537			d,
1538		},
1539		{
1540			"SELECT @val",
1541			[]QueryParameter{{"val", tm}},
1542			[]Value{rtm},
1543			rtm,
1544		},
1545		{
1546			"SELECT @val",
1547			[]QueryParameter{{"val", s{ts, []string{"a", "b"}, ss{"c"}, []ss{{"d"}, {"e"}}}}},
1548			[]Value{[]Value{ts, []Value{"a", "b"}, []Value{"c"}, []Value{[]Value{"d"}, []Value{"e"}}}},
1549			map[string]interface{}{
1550				"Timestamp":   ts,
1551				"StringArray": []interface{}{"a", "b"},
1552				"SubStruct":   map[string]interface{}{"String": "c"},
1553				"SubStructArray": []interface{}{
1554					map[string]interface{}{"String": "d"},
1555					map[string]interface{}{"String": "e"},
1556				},
1557			},
1558		},
1559		{
1560			"SELECT @val.Timestamp, @val.SubStruct.String",
1561			[]QueryParameter{{"val", s{Timestamp: ts, SubStruct: ss{"a"}}}},
1562			[]Value{ts, "a"},
1563			map[string]interface{}{
1564				"Timestamp":      ts,
1565				"SubStruct":      map[string]interface{}{"String": "a"},
1566				"StringArray":    nil,
1567				"SubStructArray": nil,
1568			},
1569		},
1570	}
1571	for _, c := range testCases {
1572		q := client.Query(c.query)
1573		q.Parameters = c.parameters
1574		job, err := q.Run(ctx)
1575		if err != nil {
1576			t.Fatal(err)
1577		}
1578		if job.LastStatus() == nil {
1579			t.Error("no LastStatus")
1580		}
1581		it, err := job.Read(ctx)
1582		if err != nil {
1583			t.Fatal(err)
1584		}
1585		checkRead(t, "QueryParameters", it, [][]Value{c.wantRow})
1586		config, err := job.Config()
1587		if err != nil {
1588			t.Fatal(err)
1589		}
1590		got := config.(*QueryConfig).Parameters[0].Value
1591		if !testutil.Equal(got, c.wantConfig) {
1592			t.Errorf("param %[1]v (%[1]T): config:\ngot %[2]v (%[2]T)\nwant %[3]v (%[3]T)",
1593				c.parameters[0].Value, got, c.wantConfig)
1594		}
1595	}
1596}
1597
1598func TestIntegration_QueryDryRun(t *testing.T) {
1599	if client == nil {
1600		t.Skip("Integration tests skipped")
1601	}
1602	ctx := context.Background()
1603	q := client.Query("SELECT word from " + stdName + " LIMIT 10")
1604	q.DryRun = true
1605	job, err := q.Run(ctx)
1606	if err != nil {
1607		t.Fatal(err)
1608	}
1609
1610	s := job.LastStatus()
1611	if s.State != Done {
1612		t.Errorf("state is %v, expected Done", s.State)
1613	}
1614	if s.Statistics == nil {
1615		t.Fatal("no statistics")
1616	}
1617	if s.Statistics.Details.(*QueryStatistics).Schema == nil {
1618		t.Fatal("no schema")
1619	}
1620	if s.Statistics.Details.(*QueryStatistics).TotalBytesProcessedAccuracy == "" {
1621		t.Fatal("no cost accuracy")
1622	}
1623}
1624
1625func TestIntegration_ExtractExternal(t *testing.T) {
1626	// Create a table, extract it to GCS, then query it externally.
1627	if client == nil {
1628		t.Skip("Integration tests skipped")
1629	}
1630	ctx := context.Background()
1631	schema := Schema{
1632		{Name: "name", Type: StringFieldType},
1633		{Name: "num", Type: IntegerFieldType},
1634	}
1635	table := newTable(t, schema)
1636	defer table.Delete(ctx)
1637
1638	// Insert table data.
1639	sql := fmt.Sprintf(`INSERT %s.%s (name, num)
1640		                VALUES ('a', 1), ('b', 2), ('c', 3)`,
1641		table.DatasetID, table.TableID)
1642	if err := runQueryJob(ctx, sql); err != nil {
1643		t.Fatal(err)
1644	}
1645	// Extract to a GCS object as CSV.
1646	bucketName := testutil.ProjID()
1647	objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID)
1648	uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
1649	defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
1650	gr := NewGCSReference(uri)
1651	gr.DestinationFormat = CSV
1652	e := table.ExtractorTo(gr)
1653	job, err := e.Run(ctx)
1654	if err != nil {
1655		t.Fatal(err)
1656	}
1657	conf, err := job.Config()
1658	if err != nil {
1659		t.Fatal(err)
1660	}
1661	config, ok := conf.(*ExtractConfig)
1662	if !ok {
1663		t.Fatalf("got %T, want ExtractConfig", conf)
1664	}
1665	diff := testutil.Diff(config, &e.ExtractConfig,
1666		cmp.AllowUnexported(Table{}),
1667		cmpopts.IgnoreUnexported(Client{}))
1668	if diff != "" {
1669		t.Errorf("got=-, want=+:\n%s", diff)
1670	}
1671	if err := wait(ctx, job); err != nil {
1672		t.Fatal(err)
1673	}
1674
1675	edc := &ExternalDataConfig{
1676		SourceFormat: CSV,
1677		SourceURIs:   []string{uri},
1678		Schema:       schema,
1679		Options: &CSVOptions{
1680			SkipLeadingRows: 1,
1681			// This is the default. Since we use edc as an expectation later on,
1682			// let's just be explicit.
1683			FieldDelimiter: ",",
1684		},
1685	}
1686	// Query that CSV file directly.
1687	q := client.Query("SELECT * FROM csv")
1688	q.TableDefinitions = map[string]ExternalData{"csv": edc}
1689	wantRows := [][]Value{
1690		{"a", int64(1)},
1691		{"b", int64(2)},
1692		{"c", int64(3)},
1693	}
1694	iter, err := q.Read(ctx)
1695	if err != nil {
1696		t.Fatal(err)
1697	}
1698	checkReadAndTotalRows(t, "external query", iter, wantRows)
1699
1700	// Make a table pointing to the file, and query it.
1701	// BigQuery does not allow a Table.Read on an external table.
1702	table = dataset.Table(tableIDs.New())
1703	err = table.Create(context.Background(), &TableMetadata{
1704		Schema:             schema,
1705		ExpirationTime:     testTableExpiration,
1706		ExternalDataConfig: edc,
1707	})
1708	if err != nil {
1709		t.Fatal(err)
1710	}
1711	q = client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID))
1712	iter, err = q.Read(ctx)
1713	if err != nil {
1714		t.Fatal(err)
1715	}
1716	checkReadAndTotalRows(t, "external table", iter, wantRows)
1717
1718	// While we're here, check that the table metadata is correct.
1719	md, err := table.Metadata(ctx)
1720	if err != nil {
1721		t.Fatal(err)
1722	}
1723	// One difference: since BigQuery returns the schema as part of the ordinary
1724	// table metadata, it does not populate ExternalDataConfig.Schema.
1725	md.ExternalDataConfig.Schema = md.Schema
1726	if diff := testutil.Diff(md.ExternalDataConfig, edc); diff != "" {
1727		t.Errorf("got=-, want=+\n%s", diff)
1728	}
1729}
1730
1731func TestIntegration_ReadNullIntoStruct(t *testing.T) {
1732	// Reading a null into a struct field should return an error (not panic).
1733	if client == nil {
1734		t.Skip("Integration tests skipped")
1735	}
1736	ctx := context.Background()
1737	table := newTable(t, schema)
1738	defer table.Delete(ctx)
1739
1740	ins := table.Inserter()
1741	row := &ValuesSaver{
1742		Schema: schema,
1743		Row:    []Value{nil, []Value{}, []Value{nil}},
1744	}
1745	if err := ins.Put(ctx, []*ValuesSaver{row}); err != nil {
1746		t.Fatal(putError(err))
1747	}
1748	if err := waitForRow(ctx, table); err != nil {
1749		t.Fatal(err)
1750	}
1751
1752	q := client.Query(fmt.Sprintf("select name from %s", table.TableID))
1753	q.DefaultProjectID = dataset.ProjectID
1754	q.DefaultDatasetID = dataset.DatasetID
1755	it, err := q.Read(ctx)
1756	if err != nil {
1757		t.Fatal(err)
1758	}
1759	type S struct{ Name string }
1760	var s S
1761	if err := it.Next(&s); err == nil {
1762		t.Fatal("got nil, want error")
1763	}
1764}
1765
1766const (
1767	stdName    = "`bigquery-public-data.samples.shakespeare`"
1768	legacyName = "[bigquery-public-data:samples.shakespeare]"
1769)
1770
1771// These tests exploit the fact that the two SQL versions have different syntaxes for
1772// fully-qualified table names.
1773var useLegacySQLTests = []struct {
1774	t           string // name of table
1775	std, legacy bool   // use standard/legacy SQL
1776	err         bool   // do we expect an error?
1777}{
1778	{t: legacyName, std: false, legacy: true, err: false},
1779	{t: legacyName, std: true, legacy: false, err: true},
1780	{t: legacyName, std: false, legacy: false, err: true}, // standard SQL is default
1781	{t: legacyName, std: true, legacy: true, err: true},
1782	{t: stdName, std: false, legacy: true, err: true},
1783	{t: stdName, std: true, legacy: false, err: false},
1784	{t: stdName, std: false, legacy: false, err: false}, // standard SQL is default
1785	{t: stdName, std: true, legacy: true, err: true},
1786}
1787
1788func TestIntegration_QueryUseLegacySQL(t *testing.T) {
1789	// Test the UseLegacySQL and UseStandardSQL options for queries.
1790	if client == nil {
1791		t.Skip("Integration tests skipped")
1792	}
1793	ctx := context.Background()
1794	for _, test := range useLegacySQLTests {
1795		q := client.Query(fmt.Sprintf("select word from %s limit 1", test.t))
1796		q.UseStandardSQL = test.std
1797		q.UseLegacySQL = test.legacy
1798		_, err := q.Read(ctx)
1799		gotErr := err != nil
1800		if gotErr && !test.err {
1801			t.Errorf("%+v:\nunexpected error: %v", test, err)
1802		} else if !gotErr && test.err {
1803			t.Errorf("%+v:\nsucceeded, but want error", test)
1804		}
1805	}
1806}
1807
1808func TestIntegration_TableUseLegacySQL(t *testing.T) {
1809	// Test UseLegacySQL and UseStandardSQL for Table.Create.
1810	if client == nil {
1811		t.Skip("Integration tests skipped")
1812	}
1813	ctx := context.Background()
1814	table := newTable(t, schema)
1815	defer table.Delete(ctx)
1816	for i, test := range useLegacySQLTests {
1817		view := dataset.Table(fmt.Sprintf("t_view_%d", i))
1818		tm := &TableMetadata{
1819			ViewQuery:      fmt.Sprintf("SELECT word from %s", test.t),
1820			UseStandardSQL: test.std,
1821			UseLegacySQL:   test.legacy,
1822		}
1823		err := view.Create(ctx, tm)
1824		gotErr := err != nil
1825		if gotErr && !test.err {
1826			t.Errorf("%+v:\nunexpected error: %v", test, err)
1827		} else if !gotErr && test.err {
1828			t.Errorf("%+v:\nsucceeded, but want error", test)
1829		}
1830		_ = view.Delete(ctx)
1831	}
1832}
1833
1834func TestIntegration_ListJobs(t *testing.T) {
1835	// It's difficult to test the list of jobs, because we can't easily
1836	// control what's in it. Also, there are many jobs in the test project,
1837	// and it takes considerable time to list them all.
1838	if client == nil {
1839		t.Skip("Integration tests skipped")
1840	}
1841	ctx := context.Background()
1842
1843	// About all we can do is list a few jobs.
1844	const max = 20
1845	var jobs []*Job
1846	it := client.Jobs(ctx)
1847	for {
1848		job, err := it.Next()
1849		if err == iterator.Done {
1850			break
1851		}
1852		if err != nil {
1853			t.Fatal(err)
1854		}
1855		jobs = append(jobs, job)
1856		if len(jobs) >= max {
1857			break
1858		}
1859	}
1860	// We expect that there is at least one job in the last few months.
1861	if len(jobs) == 0 {
1862		t.Fatal("did not get any jobs")
1863	}
1864}
1865
1866const tokyo = "asia-northeast1"
1867
1868func TestIntegration_Location(t *testing.T) {
1869	if client == nil {
1870		t.Skip("Integration tests skipped")
1871	}
1872	client.Location = ""
1873	testLocation(t, tokyo)
1874	client.Location = tokyo
1875	defer func() {
1876		client.Location = ""
1877	}()
1878	testLocation(t, "")
1879}
1880
1881func testLocation(t *testing.T, loc string) {
1882	ctx := context.Background()
1883	tokyoDataset := client.Dataset("tokyo")
1884	err := tokyoDataset.Create(ctx, &DatasetMetadata{Location: loc})
1885	if err != nil && !hasStatusCode(err, 409) { // 409 = already exists
1886		t.Fatal(err)
1887	}
1888	md, err := tokyoDataset.Metadata(ctx)
1889	if err != nil {
1890		t.Fatal(err)
1891	}
1892	if md.Location != tokyo {
1893		t.Fatalf("dataset location: got %s, want %s", md.Location, tokyo)
1894	}
1895	table := tokyoDataset.Table(tableIDs.New())
1896	err = table.Create(context.Background(), &TableMetadata{
1897		Schema: Schema{
1898			{Name: "name", Type: StringFieldType},
1899			{Name: "nums", Type: IntegerFieldType},
1900		},
1901		ExpirationTime: testTableExpiration,
1902	})
1903	if err != nil {
1904		t.Fatal(err)
1905	}
1906	defer table.Delete(ctx)
1907	loader := table.LoaderFrom(NewReaderSource(strings.NewReader("a,0\nb,1\nc,2\n")))
1908	loader.Location = loc
1909	job, err := loader.Run(ctx)
1910	if err != nil {
1911		t.Fatal("loader.Run", err)
1912	}
1913	if job.Location() != tokyo {
1914		t.Fatalf("job location: got %s, want %s", job.Location(), tokyo)
1915	}
1916	_, err = client.JobFromID(ctx, job.ID())
1917	if client.Location == "" && err == nil {
1918		t.Error("JobFromID with Tokyo job, no client location: want error, got nil")
1919	}
1920	if client.Location != "" && err != nil {
1921		t.Errorf("JobFromID with Tokyo job, with client location: want nil, got %v", err)
1922	}
1923	_, err = client.JobFromIDLocation(ctx, job.ID(), "US")
1924	if err == nil {
1925		t.Error("JobFromIDLocation with US: want error, got nil")
1926	}
1927	job2, err := client.JobFromIDLocation(ctx, job.ID(), loc)
1928	if loc == tokyo && err != nil {
1929		t.Errorf("loc=tokyo: %v", err)
1930	}
1931	if loc == "" && err == nil {
1932		t.Error("loc empty: got nil, want error")
1933	}
1934	if job2 != nil && (job2.ID() != job.ID() || job2.Location() != tokyo) {
1935		t.Errorf("got id %s loc %s, want id%s loc %s", job2.ID(), job2.Location(), job.ID(), tokyo)
1936	}
1937	if err := wait(ctx, job); err != nil {
1938		t.Fatal(err)
1939	}
1940	// Cancel should succeed even if the job is done.
1941	if err := job.Cancel(ctx); err != nil {
1942		t.Fatal(err)
1943	}
1944
1945	q := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID))
1946	q.Location = loc
1947	iter, err := q.Read(ctx)
1948	if err != nil {
1949		t.Fatal(err)
1950	}
1951	wantRows := [][]Value{
1952		{"a", int64(0)},
1953		{"b", int64(1)},
1954		{"c", int64(2)},
1955	}
1956	checkRead(t, "location", iter, wantRows)
1957
1958	table2 := tokyoDataset.Table(tableIDs.New())
1959	copier := table2.CopierFrom(table)
1960	copier.Location = loc
1961	if _, err := copier.Run(ctx); err != nil {
1962		t.Fatal(err)
1963	}
1964	bucketName := testutil.ProjID()
1965	objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID)
1966	uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
1967	defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
1968	gr := NewGCSReference(uri)
1969	gr.DestinationFormat = CSV
1970	e := table.ExtractorTo(gr)
1971	e.Location = loc
1972	if _, err := e.Run(ctx); err != nil {
1973		t.Fatal(err)
1974	}
1975}
1976
1977func TestIntegration_NumericErrors(t *testing.T) {
1978	// Verify that the service returns an error for a big.Rat that's too large.
1979	if client == nil {
1980		t.Skip("Integration tests skipped")
1981	}
1982	ctx := context.Background()
1983	schema := Schema{{Name: "n", Type: NumericFieldType}}
1984	table := newTable(t, schema)
1985	defer table.Delete(ctx)
1986	tooBigRat := &big.Rat{}
1987	if _, ok := tooBigRat.SetString("1e40"); !ok {
1988		t.Fatal("big.Rat.SetString failed")
1989	}
1990	ins := table.Inserter()
1991	err := ins.Put(ctx, []*ValuesSaver{{Schema: schema, Row: []Value{tooBigRat}}})
1992	if err == nil {
1993		t.Fatal("got nil, want error")
1994	}
1995}
1996
1997func TestIntegration_QueryErrors(t *testing.T) {
1998	// Verify that a bad query returns an appropriate error.
1999	if client == nil {
2000		t.Skip("Integration tests skipped")
2001	}
2002	ctx := context.Background()
2003	q := client.Query("blah blah broken")
2004	_, err := q.Read(ctx)
2005	const want = "invalidQuery"
2006	if !strings.Contains(err.Error(), want) {
2007		t.Fatalf("got %q, want substring %q", err, want)
2008	}
2009}
2010
2011func TestIntegration_ModelLifecycle(t *testing.T) {
2012	if client == nil {
2013		t.Skip("Integration tests skipped")
2014	}
2015	ctx := context.Background()
2016
2017	// Create a model via a CREATE MODEL query
2018	modelID := modelIDs.New()
2019	model := dataset.Model(modelID)
2020	modelRef := fmt.Sprintf("%s.%s.%s", dataset.ProjectID, dataset.DatasetID, modelID)
2021
2022	sql := fmt.Sprintf(`
2023		CREATE MODEL `+"`%s`"+`
2024		OPTIONS (
2025			model_type='linear_reg',
2026			max_iteration=1,
2027			learn_rate=0.4,
2028			learn_rate_strategy='constant'
2029		) AS (
2030			SELECT 'a' AS f1, 2.0 AS label
2031			UNION ALL
2032			SELECT 'b' AS f1, 3.8 AS label
2033		)`, modelRef)
2034	if err := runQueryJob(ctx, sql); err != nil {
2035		t.Fatal(err)
2036	}
2037	defer model.Delete(ctx)
2038
2039	// Get the model metadata.
2040	curMeta, err := model.Metadata(ctx)
2041	if err != nil {
2042		t.Fatalf("couldn't get metadata: %v", err)
2043	}
2044
2045	want := "LINEAR_REGRESSION"
2046	if curMeta.Type != want {
2047		t.Errorf("Model type mismatch.  Want %s got %s", curMeta.Type, want)
2048	}
2049
2050	// Ensure training metadata is available.
2051	runs := curMeta.RawTrainingRuns()
2052	if runs == nil {
2053		t.Errorf("training runs unpopulated.")
2054	}
2055	labelCols, err := curMeta.RawLabelColumns()
2056	if err != nil {
2057		t.Fatalf("failed to get label cols: %v", err)
2058	}
2059	if labelCols == nil {
2060		t.Errorf("label column information unpopulated.")
2061	}
2062	featureCols, err := curMeta.RawFeatureColumns()
2063	if err != nil {
2064		t.Fatalf("failed to get feature cols: %v", err)
2065	}
2066	if featureCols == nil {
2067		t.Errorf("feature column information unpopulated.")
2068	}
2069
2070	// Update mutable fields via API.
2071	expiry := time.Now().Add(24 * time.Hour).Truncate(time.Millisecond)
2072
2073	upd := ModelMetadataToUpdate{
2074		Description:    "new",
2075		Name:           "friendly",
2076		ExpirationTime: expiry,
2077	}
2078
2079	newMeta, err := model.Update(ctx, upd, curMeta.ETag)
2080	if err != nil {
2081		t.Fatalf("failed to update: %v", err)
2082	}
2083
2084	want = "new"
2085	if newMeta.Description != want {
2086		t.Fatalf("Description not updated. got %s want %s", newMeta.Description, want)
2087	}
2088	want = "friendly"
2089	if newMeta.Name != want {
2090		t.Fatalf("Description not updated. got %s want %s", newMeta.Description, want)
2091	}
2092	if newMeta.ExpirationTime != expiry {
2093		t.Fatalf("ExpirationTime not updated.  got %v want %v", newMeta.ExpirationTime, expiry)
2094	}
2095
2096	// Ensure presence when enumerating the model list.
2097	it := dataset.Models(ctx)
2098	seen := false
2099	for {
2100		mdl, err := it.Next()
2101		if err == iterator.Done {
2102			break
2103		}
2104		if err != nil {
2105			t.Fatal(err)
2106		}
2107		if mdl.ModelID == modelID {
2108			seen = true
2109		}
2110	}
2111	if !seen {
2112		t.Fatal("model not listed in dataset")
2113	}
2114
2115	// Delete the model.
2116	if err := model.Delete(ctx); err != nil {
2117		t.Fatalf("failed to delete model: %v", err)
2118	}
2119}
2120
2121func TestIntegration_RoutineScalarUDF(t *testing.T) {
2122	if client == nil {
2123		t.Skip("Integration tests skipped")
2124	}
2125	ctx := context.Background()
2126
2127	// Create a scalar UDF routine via API.
2128	routineID := routineIDs.New()
2129	routine := dataset.Routine(routineID)
2130	err := routine.Create(ctx, &RoutineMetadata{
2131		Type:     "SCALAR_FUNCTION",
2132		Language: "SQL",
2133		Body:     "x * 3",
2134		Arguments: []*RoutineArgument{
2135			{
2136				Name: "x",
2137				DataType: &StandardSQLDataType{
2138					TypeKind: "INT64",
2139				},
2140			},
2141		},
2142	})
2143	if err != nil {
2144		t.Fatalf("Create: %v", err)
2145	}
2146}
2147
2148func TestIntegration_RoutineComplexTypes(t *testing.T) {
2149	if client == nil {
2150		t.Skip("Integration tests skipped")
2151	}
2152	ctx := context.Background()
2153
2154	routineID := routineIDs.New()
2155	routine := dataset.Routine(routineID)
2156	sql := fmt.Sprintf(`
2157		CREATE FUNCTION `+"`%s`("+`
2158			arr ARRAY<STRUCT<name STRING, val INT64>>
2159		  ) AS (
2160			  (SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem)
2161		  )`,
2162		routine.FullyQualifiedName())
2163	if err := runQueryJob(ctx, sql); err != nil {
2164		t.Fatal(err)
2165	}
2166	defer routine.Delete(ctx)
2167
2168	meta, err := routine.Metadata(ctx)
2169	if err != nil {
2170		t.Fatalf("Metadata: %v", err)
2171	}
2172	if meta.Type != "SCALAR_FUNCTION" {
2173		t.Fatalf("routine type mismatch, got %s want SCALAR_FUNCTION", meta.Type)
2174	}
2175	if meta.Language != "SQL" {
2176		t.Fatalf("language type mismatch, got  %s want SQL", meta.Language)
2177	}
2178	want := []*RoutineArgument{
2179		{
2180			Name: "arr",
2181			DataType: &StandardSQLDataType{
2182				TypeKind: "ARRAY",
2183				ArrayElementType: &StandardSQLDataType{
2184					TypeKind: "STRUCT",
2185					StructType: &StandardSQLStructType{
2186						Fields: []*StandardSQLField{
2187							{
2188								Name: "name",
2189								Type: &StandardSQLDataType{
2190									TypeKind: "STRING",
2191								},
2192							},
2193							{
2194								Name: "val",
2195								Type: &StandardSQLDataType{
2196									TypeKind: "INT64",
2197								},
2198							},
2199						},
2200					},
2201				},
2202			},
2203		},
2204	}
2205	if diff := testutil.Diff(meta.Arguments, want); diff != "" {
2206		t.Fatalf("%+v: -got, +want:\n%s", meta.Arguments, diff)
2207	}
2208}
2209
2210func TestIntegration_RoutineLifecycle(t *testing.T) {
2211	if client == nil {
2212		t.Skip("Integration tests skipped")
2213	}
2214	ctx := context.Background()
2215
2216	// Create a scalar UDF routine via a CREATE FUNCTION query
2217	routineID := routineIDs.New()
2218	routine := dataset.Routine(routineID)
2219
2220	sql := fmt.Sprintf(`
2221		CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
2222		routine.FullyQualifiedName())
2223	if err := runQueryJob(ctx, sql); err != nil {
2224		t.Fatal(err)
2225	}
2226	defer routine.Delete(ctx)
2227
2228	// Get the routine metadata.
2229	curMeta, err := routine.Metadata(ctx)
2230	if err != nil {
2231		t.Fatalf("couldn't get metadata: %v", err)
2232	}
2233
2234	want := "SCALAR_FUNCTION"
2235	if curMeta.Type != want {
2236		t.Errorf("Routine type mismatch.  got %s want %s", curMeta.Type, want)
2237	}
2238
2239	want = "SQL"
2240	if curMeta.Language != want {
2241		t.Errorf("Language mismatch. got %s want %s", curMeta.Language, want)
2242	}
2243
2244	// Perform an update to change the routine body.
2245	want = "x * 4"
2246	// during beta, update doesn't allow partial updates.  Provide all fields.
2247	newMeta, err := routine.Update(ctx, &RoutineMetadataToUpdate{
2248		Body:       want,
2249		Arguments:  curMeta.Arguments,
2250		ReturnType: curMeta.ReturnType,
2251		Type:       curMeta.Type,
2252	}, curMeta.ETag)
2253	if err != nil {
2254		t.Fatalf("Update: %v", err)
2255	}
2256	if newMeta.Body != want {
2257		t.Fatalf("Update failed.  want %s got %s", want, newMeta.Body)
2258	}
2259
2260	// Ensure presence when enumerating the model list.
2261	it := dataset.Routines(ctx)
2262	seen := false
2263	for {
2264		r, err := it.Next()
2265		if err == iterator.Done {
2266			break
2267		}
2268		if err != nil {
2269			t.Fatal(err)
2270		}
2271		if r.RoutineID == routineID {
2272			seen = true
2273		}
2274	}
2275	if !seen {
2276		t.Fatal("routine not listed in dataset")
2277	}
2278
2279	// Delete the model.
2280	if err := routine.Delete(ctx); err != nil {
2281		t.Fatalf("failed to delete routine: %v", err)
2282	}
2283}
2284
2285// Creates a new, temporary table with a unique name and the given schema.
2286func newTable(t *testing.T, s Schema) *Table {
2287	table := dataset.Table(tableIDs.New())
2288	err := table.Create(context.Background(), &TableMetadata{
2289		Schema:         s,
2290		ExpirationTime: testTableExpiration,
2291	})
2292	if err != nil {
2293		t.Fatal(err)
2294	}
2295	return table
2296}
2297
2298func checkRead(t *testing.T, msg string, it *RowIterator, want [][]Value) {
2299	if msg2, ok := compareRead(it, want, false); !ok {
2300		t.Errorf("%s: %s", msg, msg2)
2301	}
2302}
2303
2304func checkReadAndTotalRows(t *testing.T, msg string, it *RowIterator, want [][]Value) {
2305	if msg2, ok := compareRead(it, want, true); !ok {
2306		t.Errorf("%s: %s", msg, msg2)
2307	}
2308}
2309
2310func compareRead(it *RowIterator, want [][]Value, compareTotalRows bool) (msg string, ok bool) {
2311	got, _, totalRows, err := readAll(it)
2312	if err != nil {
2313		return err.Error(), false
2314	}
2315	if len(got) != len(want) {
2316		return fmt.Sprintf("got %d rows, want %d", len(got), len(want)), false
2317	}
2318	if compareTotalRows && len(got) != int(totalRows) {
2319		return fmt.Sprintf("got %d rows, but totalRows = %d", len(got), totalRows), false
2320	}
2321	sort.Sort(byCol0(got))
2322	for i, r := range got {
2323		gotRow := []Value(r)
2324		wantRow := want[i]
2325		if !testutil.Equal(gotRow, wantRow) {
2326			return fmt.Sprintf("#%d: got %#v, want %#v", i, gotRow, wantRow), false
2327		}
2328	}
2329	return "", true
2330}
2331
2332func readAll(it *RowIterator) ([][]Value, Schema, uint64, error) {
2333	var (
2334		rows      [][]Value
2335		schema    Schema
2336		totalRows uint64
2337	)
2338	for {
2339		var vals []Value
2340		err := it.Next(&vals)
2341		if err == iterator.Done {
2342			return rows, schema, totalRows, nil
2343		}
2344		if err != nil {
2345			return nil, nil, 0, err
2346		}
2347		rows = append(rows, vals)
2348		schema = it.Schema
2349		totalRows = it.TotalRows
2350	}
2351}
2352
2353type byCol0 [][]Value
2354
2355func (b byCol0) Len() int      { return len(b) }
2356func (b byCol0) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
2357func (b byCol0) Less(i, j int) bool {
2358	switch a := b[i][0].(type) {
2359	case string:
2360		return a < b[j][0].(string)
2361	case civil.Date:
2362		return a.Before(b[j][0].(civil.Date))
2363	default:
2364		panic("unknown type")
2365	}
2366}
2367
2368func hasStatusCode(err error, code int) bool {
2369	if e, ok := err.(*googleapi.Error); ok && e.Code == code {
2370		return true
2371	}
2372	return false
2373}
2374
2375// wait polls the job until it is complete or an error is returned.
2376func wait(ctx context.Context, job *Job) error {
2377	status, err := job.Wait(ctx)
2378	if err != nil {
2379		return err
2380	}
2381	if status.Err() != nil {
2382		return fmt.Errorf("job status error: %#v", status.Err())
2383	}
2384	if status.Statistics == nil {
2385		return errors.New("nil Statistics")
2386	}
2387	if status.Statistics.EndTime.IsZero() {
2388		return errors.New("EndTime is zero")
2389	}
2390	if status.Statistics.Details == nil {
2391		return errors.New("nil Statistics.Details")
2392	}
2393	return nil
2394}
2395
2396// waitForRow polls the table until it contains a row.
2397// TODO(jba): use internal.Retry.
2398func waitForRow(ctx context.Context, table *Table) error {
2399	for {
2400		it := table.Read(ctx)
2401		var v []Value
2402		err := it.Next(&v)
2403		if err == nil {
2404			return nil
2405		}
2406		if err != iterator.Done {
2407			return err
2408		}
2409		time.Sleep(1 * time.Second)
2410	}
2411}
2412
2413func putError(err error) string {
2414	pme, ok := err.(PutMultiError)
2415	if !ok {
2416		return err.Error()
2417	}
2418	var msgs []string
2419	for _, err := range pme {
2420		msgs = append(msgs, err.Error())
2421	}
2422	return strings.Join(msgs, "\n")
2423}
2424