1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"encoding/json"
19	"errors"
20	"flag"
21	"fmt"
22	"log"
23	"math/big"
24	"net/http"
25	"os"
26	"sort"
27	"strings"
28	"testing"
29	"time"
30
31	"github.com/google/go-cmp/cmp"
32	"github.com/google/go-cmp/cmp/cmpopts"
33	gax "github.com/googleapis/gax-go"
34
35	"cloud.google.com/go/civil"
36	"cloud.google.com/go/httpreplay"
37	"cloud.google.com/go/internal"
38	"cloud.google.com/go/internal/pretty"
39	"cloud.google.com/go/internal/testutil"
40	"cloud.google.com/go/internal/uid"
41	"cloud.google.com/go/storage"
42	"golang.org/x/net/context"
43	"google.golang.org/api/googleapi"
44	"google.golang.org/api/iterator"
45	"google.golang.org/api/option"
46)
47
48const replayFilename = "bigquery.replay"
49
50var record = flag.Bool("record", false, "record RPCs")
51
52var (
53	client        *Client
54	storageClient *storage.Client
55	dataset       *Dataset
56	schema        = Schema{
57		{Name: "name", Type: StringFieldType},
58		{Name: "nums", Type: IntegerFieldType, Repeated: true},
59		{Name: "rec", Type: RecordFieldType, Schema: Schema{
60			{Name: "bool", Type: BooleanFieldType},
61		}},
62	}
63	testTableExpiration  time.Time
64	datasetIDs, tableIDs *uid.Space
65)
66
67// Note: integration tests cannot be run in parallel, because TestIntegration_Location
68// modifies the client.
69
70func TestMain(m *testing.M) {
71	cleanup := initIntegrationTest()
72	r := m.Run()
73	cleanup()
74	os.Exit(r)
75}
76
77func getClient(t *testing.T) *Client {
78	if client == nil {
79		t.Skip("Integration tests skipped")
80	}
81	return client
82}
83
84// If integration tests will be run, create a unique dataset for them.
85// Return a cleanup function.
86func initIntegrationTest() func() {
87	ctx := context.Background()
88	flag.Parse() // needed for testing.Short()
89	projID := testutil.ProjID()
90	switch {
91	case testing.Short() && *record:
92		log.Fatal("cannot combine -short and -record")
93		return func() {}
94
95	case testing.Short() && httpreplay.Supported() && testutil.CanReplay(replayFilename) && projID != "":
96		// go test -short with a replay file will replay the integration tests if the
97		// environment variables are set.
98		log.Printf("replaying from %s", replayFilename)
99		httpreplay.DebugHeaders()
100		replayer, err := httpreplay.NewReplayer(replayFilename)
101		if err != nil {
102			log.Fatal(err)
103		}
104		var t time.Time
105		if err := json.Unmarshal(replayer.Initial(), &t); err != nil {
106			log.Fatal(err)
107		}
108		hc, err := replayer.Client(ctx) // no creds needed
109		if err != nil {
110			log.Fatal(err)
111		}
112		client, err = NewClient(ctx, projID, option.WithHTTPClient(hc))
113		if err != nil {
114			log.Fatal(err)
115		}
116		storageClient, err = storage.NewClient(ctx, option.WithHTTPClient(hc))
117		if err != nil {
118			log.Fatal(err)
119		}
120		cleanup := initTestState(client, t)
121		return func() {
122			cleanup()
123			_ = replayer.Close() // No actionable error returned.
124		}
125
126	case testing.Short():
127		// go test -short without a replay file skips the integration tests.
128		if testutil.CanReplay(replayFilename) && projID != "" {
129			log.Print("replay not supported for Go versions before 1.8")
130		}
131		client = nil
132		storageClient = nil
133		return func() {}
134
135	default: // Run integration tests against a real backend.
136		ts := testutil.TokenSource(ctx, Scope)
137		if ts == nil {
138			log.Println("Integration tests skipped. See CONTRIBUTING.md for details")
139			return func() {}
140		}
141		bqOpt := option.WithTokenSource(ts)
142		sOpt := option.WithTokenSource(testutil.TokenSource(ctx, storage.ScopeFullControl))
143		cleanup := func() {}
144		now := time.Now().UTC()
145		if *record {
146			if !httpreplay.Supported() {
147				log.Print("record not supported for Go versions before 1.8")
148			} else {
149				nowBytes, err := json.Marshal(now)
150				if err != nil {
151					log.Fatal(err)
152				}
153				recorder, err := httpreplay.NewRecorder(replayFilename, nowBytes)
154				if err != nil {
155					log.Fatalf("could not record: %v", err)
156				}
157				log.Printf("recording to %s", replayFilename)
158				hc, err := recorder.Client(ctx, bqOpt)
159				if err != nil {
160					log.Fatal(err)
161				}
162				bqOpt = option.WithHTTPClient(hc)
163				hc, err = recorder.Client(ctx, sOpt)
164				if err != nil {
165					log.Fatal(err)
166				}
167				sOpt = option.WithHTTPClient(hc)
168				cleanup = func() {
169					if err := recorder.Close(); err != nil {
170						log.Printf("saving recording: %v", err)
171					}
172				}
173			}
174		}
175		var err error
176		client, err = NewClient(ctx, projID, bqOpt)
177		if err != nil {
178			log.Fatalf("NewClient: %v", err)
179		}
180		storageClient, err = storage.NewClient(ctx, sOpt)
181		if err != nil {
182			log.Fatalf("storage.NewClient: %v", err)
183		}
184		c := initTestState(client, now)
185		return func() { c(); cleanup() }
186	}
187}
188
189func initTestState(client *Client, t time.Time) func() {
190	// BigQuery does not accept hyphens in dataset or table IDs, so we create IDs
191	// with underscores.
192	ctx := context.Background()
193	opts := &uid.Options{Sep: '_', Time: t}
194	datasetIDs = uid.NewSpace("dataset", opts)
195	tableIDs = uid.NewSpace("table", opts)
196	testTableExpiration = t.Add(10 * time.Minute).Round(time.Second)
197	// For replayability, seed the random source with t.
198	Seed(t.UnixNano())
199
200	dataset = client.Dataset(datasetIDs.New())
201	if err := dataset.Create(ctx, nil); err != nil {
202		log.Fatalf("creating dataset %s: %v", dataset.DatasetID, err)
203	}
204	return func() {
205		if err := dataset.DeleteWithContents(ctx); err != nil {
206			log.Printf("could not delete %s", dataset.DatasetID)
207		}
208	}
209}
210
211func TestIntegration_TableCreate(t *testing.T) {
212	// Check that creating a record field with an empty schema is an error.
213	if client == nil {
214		t.Skip("Integration tests skipped")
215	}
216	table := dataset.Table("t_bad")
217	schema := Schema{
218		{Name: "rec", Type: RecordFieldType, Schema: Schema{}},
219	}
220	err := table.Create(context.Background(), &TableMetadata{
221		Schema:         schema,
222		ExpirationTime: testTableExpiration.Add(5 * time.Minute),
223	})
224	if err == nil {
225		t.Fatal("want error, got nil")
226	}
227	if !hasStatusCode(err, http.StatusBadRequest) {
228		t.Fatalf("want a 400 error, got %v", err)
229	}
230}
231
232func TestIntegration_TableCreateView(t *testing.T) {
233	if client == nil {
234		t.Skip("Integration tests skipped")
235	}
236	ctx := context.Background()
237	table := newTable(t, schema)
238	defer table.Delete(ctx)
239
240	// Test that standard SQL views work.
241	view := dataset.Table("t_view_standardsql")
242	query := fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM `%s.%s.%s`",
243		dataset.ProjectID, dataset.DatasetID, table.TableID)
244	err := view.Create(context.Background(), &TableMetadata{
245		ViewQuery:      query,
246		UseStandardSQL: true,
247	})
248	if err != nil {
249		t.Fatalf("table.create: Did not expect an error, got: %v", err)
250	}
251	if err := view.Delete(ctx); err != nil {
252		t.Fatal(err)
253	}
254}
255
256func TestIntegration_TableMetadata(t *testing.T) {
257	if client == nil {
258		t.Skip("Integration tests skipped")
259	}
260	ctx := context.Background()
261	table := newTable(t, schema)
262	defer table.Delete(ctx)
263	// Check table metadata.
264	md, err := table.Metadata(ctx)
265	if err != nil {
266		t.Fatal(err)
267	}
268	// TODO(jba): check md more thorougly.
269	if got, want := md.FullID, fmt.Sprintf("%s:%s.%s", dataset.ProjectID, dataset.DatasetID, table.TableID); got != want {
270		t.Errorf("metadata.FullID: got %q, want %q", got, want)
271	}
272	if got, want := md.Type, RegularTable; got != want {
273		t.Errorf("metadata.Type: got %v, want %v", got, want)
274	}
275	if got, want := md.ExpirationTime, testTableExpiration; !got.Equal(want) {
276		t.Errorf("metadata.Type: got %v, want %v", got, want)
277	}
278
279	// Check that timePartitioning is nil by default
280	if md.TimePartitioning != nil {
281		t.Errorf("metadata.TimePartitioning: got %v, want %v", md.TimePartitioning, nil)
282	}
283
284	// Create tables that have time partitioning
285	partitionCases := []struct {
286		timePartitioning TimePartitioning
287		wantExpiration   time.Duration
288		wantField        string
289	}{
290		{TimePartitioning{}, time.Duration(0), ""},
291		{TimePartitioning{Expiration: time.Second}, time.Second, ""},
292		{
293			TimePartitioning{
294				Expiration: time.Second,
295				Field:      "date",
296			}, time.Second, "date"},
297	}
298
299	schema2 := Schema{
300		{Name: "name", Type: StringFieldType},
301		{Name: "date", Type: DateFieldType},
302	}
303
304	clustering := &Clustering{
305		Fields: []string{"name"},
306	}
307
308	// Currently, clustering depends on partitioning.  Interleave testing of the two features.
309	for i, c := range partitionCases {
310		table := dataset.Table(fmt.Sprintf("t_metadata_partition_nocluster_%v", i))
311		clusterTable := dataset.Table(fmt.Sprintf("t_metadata_partition_cluster_%v", i))
312
313		// Create unclustered, partitioned variant and get metadata.
314		err = table.Create(context.Background(), &TableMetadata{
315			Schema:           schema2,
316			TimePartitioning: &c.timePartitioning,
317			ExpirationTime:   testTableExpiration,
318		})
319		if err != nil {
320			t.Fatal(err)
321		}
322		defer table.Delete(ctx)
323		md, err := table.Metadata(ctx)
324		if err != nil {
325			t.Fatal(err)
326		}
327
328		// Created clustered table and get metadata.
329		err = clusterTable.Create(context.Background(), &TableMetadata{
330			Schema:           schema2,
331			TimePartitioning: &c.timePartitioning,
332			ExpirationTime:   testTableExpiration,
333			Clustering:       clustering,
334		})
335		clusterMD, err := clusterTable.Metadata(ctx)
336		if err != nil {
337			t.Fatal(err)
338		}
339
340		for _, v := range []*TableMetadata{md, clusterMD} {
341			got := v.TimePartitioning
342			want := &TimePartitioning{
343				Expiration: c.wantExpiration,
344				Field:      c.wantField,
345			}
346			if !testutil.Equal(got, want) {
347				t.Errorf("metadata.TimePartitioning: got %v, want %v", got, want)
348			}
349		}
350
351		if md.Clustering != nil {
352			t.Errorf("metadata.Clustering was not nil on unclustered table %s", table.TableID)
353		}
354		got := clusterMD.Clustering
355		want := clustering
356		if clusterMD.Clustering != clustering {
357			if !testutil.Equal(got, want) {
358				t.Errorf("metadata.Clustering: got %v, want %v", got, want)
359			}
360		}
361	}
362
363}
364
365func TestIntegration_DatasetCreate(t *testing.T) {
366	if client == nil {
367		t.Skip("Integration tests skipped")
368	}
369	ctx := context.Background()
370	ds := client.Dataset(datasetIDs.New())
371	wmd := &DatasetMetadata{Name: "name", Location: "EU"}
372	err := ds.Create(ctx, wmd)
373	if err != nil {
374		t.Fatal(err)
375	}
376	gmd, err := ds.Metadata(ctx)
377	if err != nil {
378		t.Fatal(err)
379	}
380	if got, want := gmd.Name, wmd.Name; got != want {
381		t.Errorf("name: got %q, want %q", got, want)
382	}
383	if got, want := gmd.Location, wmd.Location; got != want {
384		t.Errorf("location: got %q, want %q", got, want)
385	}
386	if err := ds.Delete(ctx); err != nil {
387		t.Fatalf("deleting dataset %v: %v", ds, err)
388	}
389}
390
391func TestIntegration_DatasetMetadata(t *testing.T) {
392	if client == nil {
393		t.Skip("Integration tests skipped")
394	}
395	ctx := context.Background()
396	md, err := dataset.Metadata(ctx)
397	if err != nil {
398		t.Fatal(err)
399	}
400	if got, want := md.FullID, fmt.Sprintf("%s:%s", dataset.ProjectID, dataset.DatasetID); got != want {
401		t.Errorf("FullID: got %q, want %q", got, want)
402	}
403	jan2016 := time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC)
404	if md.CreationTime.Before(jan2016) {
405		t.Errorf("CreationTime: got %s, want > 2016-1-1", md.CreationTime)
406	}
407	if md.LastModifiedTime.Before(jan2016) {
408		t.Errorf("LastModifiedTime: got %s, want > 2016-1-1", md.LastModifiedTime)
409	}
410
411	// Verify that we get a NotFound for a nonexistent dataset.
412	_, err = client.Dataset("does_not_exist").Metadata(ctx)
413	if err == nil || !hasStatusCode(err, http.StatusNotFound) {
414		t.Errorf("got %v, want NotFound error", err)
415	}
416}
417
418func TestIntegration_DatasetDelete(t *testing.T) {
419	if client == nil {
420		t.Skip("Integration tests skipped")
421	}
422	ctx := context.Background()
423	ds := client.Dataset(datasetIDs.New())
424	if err := ds.Create(ctx, nil); err != nil {
425		t.Fatalf("creating dataset %s: %v", ds.DatasetID, err)
426	}
427	if err := ds.Delete(ctx); err != nil {
428		t.Fatalf("deleting dataset %s: %v", ds.DatasetID, err)
429	}
430}
431
432func TestIntegration_DatasetDeleteWithContents(t *testing.T) {
433	if client == nil {
434		t.Skip("Integration tests skipped")
435	}
436	ctx := context.Background()
437	ds := client.Dataset(datasetIDs.New())
438	if err := ds.Create(ctx, nil); err != nil {
439		t.Fatalf("creating dataset %s: %v", ds.DatasetID, err)
440	}
441	table := ds.Table(tableIDs.New())
442	if err := table.Create(ctx, nil); err != nil {
443		t.Fatalf("creating table %s in dataset %s: %v", table.TableID, table.DatasetID, err)
444	}
445	// We expect failure here
446	if err := ds.Delete(ctx); err == nil {
447		t.Fatalf("non-recursive delete of dataset %s succeeded unexpectedly.", ds.DatasetID)
448	}
449	if err := ds.DeleteWithContents(ctx); err != nil {
450		t.Fatalf("deleting recursively dataset %s: %v", ds.DatasetID, err)
451	}
452}
453
454func TestIntegration_DatasetUpdateETags(t *testing.T) {
455	if client == nil {
456		t.Skip("Integration tests skipped")
457	}
458
459	check := func(md *DatasetMetadata, wantDesc, wantName string) {
460		if md.Description != wantDesc {
461			t.Errorf("description: got %q, want %q", md.Description, wantDesc)
462		}
463		if md.Name != wantName {
464			t.Errorf("name: got %q, want %q", md.Name, wantName)
465		}
466	}
467
468	ctx := context.Background()
469	md, err := dataset.Metadata(ctx)
470	if err != nil {
471		t.Fatal(err)
472	}
473	if md.ETag == "" {
474		t.Fatal("empty ETag")
475	}
476	// Write without ETag succeeds.
477	desc := md.Description + "d2"
478	name := md.Name + "n2"
479	md2, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: desc, Name: name}, "")
480	if err != nil {
481		t.Fatal(err)
482	}
483	check(md2, desc, name)
484
485	// Write with original ETag fails because of intervening write.
486	_, err = dataset.Update(ctx, DatasetMetadataToUpdate{Description: "d", Name: "n"}, md.ETag)
487	if err == nil {
488		t.Fatal("got nil, want error")
489	}
490
491	// Write with most recent ETag succeeds.
492	md3, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: "", Name: ""}, md2.ETag)
493	if err != nil {
494		t.Fatal(err)
495	}
496	check(md3, "", "")
497}
498
499func TestIntegration_DatasetUpdateDefaultExpiration(t *testing.T) {
500	if client == nil {
501		t.Skip("Integration tests skipped")
502	}
503	ctx := context.Background()
504	md, err := dataset.Metadata(ctx)
505	if err != nil {
506		t.Fatal(err)
507	}
508	// Set the default expiration time.
509	md, err = dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Hour}, "")
510	if err != nil {
511		t.Fatal(err)
512	}
513	if md.DefaultTableExpiration != time.Hour {
514		t.Fatalf("got %s, want 1h", md.DefaultTableExpiration)
515	}
516	// Omitting DefaultTableExpiration doesn't change it.
517	md, err = dataset.Update(ctx, DatasetMetadataToUpdate{Name: "xyz"}, "")
518	if err != nil {
519		t.Fatal(err)
520	}
521	if md.DefaultTableExpiration != time.Hour {
522		t.Fatalf("got %s, want 1h", md.DefaultTableExpiration)
523	}
524	// Setting it to 0 deletes it (which looks like a 0 duration).
525	md, err = dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Duration(0)}, "")
526	if err != nil {
527		t.Fatal(err)
528	}
529	if md.DefaultTableExpiration != 0 {
530		t.Fatalf("got %s, want 0", md.DefaultTableExpiration)
531	}
532}
533
534func TestIntegration_DatasetUpdateAccess(t *testing.T) {
535	if client == nil {
536		t.Skip("Integration tests skipped")
537	}
538	ctx := context.Background()
539	md, err := dataset.Metadata(ctx)
540	if err != nil {
541		t.Fatal(err)
542	}
543	origAccess := append([]*AccessEntry(nil), md.Access...)
544	newEntry := &AccessEntry{
545		Role:       ReaderRole,
546		Entity:     "Joe@example.com",
547		EntityType: UserEmailEntity,
548	}
549	newAccess := append(md.Access, newEntry)
550	dm := DatasetMetadataToUpdate{Access: newAccess}
551	md, err = dataset.Update(ctx, dm, md.ETag)
552	if err != nil {
553		t.Fatal(err)
554	}
555	defer func() {
556		_, err := dataset.Update(ctx, DatasetMetadataToUpdate{Access: origAccess}, md.ETag)
557		if err != nil {
558			t.Log("could not restore dataset access list")
559		}
560	}()
561	if diff := testutil.Diff(md.Access, newAccess); diff != "" {
562		t.Fatalf("got=-, want=+:\n%s", diff)
563	}
564}
565
566func TestIntegration_DatasetUpdateLabels(t *testing.T) {
567	if client == nil {
568		t.Skip("Integration tests skipped")
569	}
570	ctx := context.Background()
571	md, err := dataset.Metadata(ctx)
572	if err != nil {
573		t.Fatal(err)
574	}
575	var dm DatasetMetadataToUpdate
576	dm.SetLabel("label", "value")
577	md, err = dataset.Update(ctx, dm, "")
578	if err != nil {
579		t.Fatal(err)
580	}
581	if got, want := md.Labels["label"], "value"; got != want {
582		t.Errorf("got %q, want %q", got, want)
583	}
584	dm = DatasetMetadataToUpdate{}
585	dm.DeleteLabel("label")
586	md, err = dataset.Update(ctx, dm, "")
587	if err != nil {
588		t.Fatal(err)
589	}
590	if _, ok := md.Labels["label"]; ok {
591		t.Error("label still present after deletion")
592	}
593}
594
595func TestIntegration_TableUpdateLabels(t *testing.T) {
596	if client == nil {
597		t.Skip("Integration tests skipped")
598	}
599	ctx := context.Background()
600	table := newTable(t, schema)
601	defer table.Delete(ctx)
602
603	var tm TableMetadataToUpdate
604	tm.SetLabel("label", "value")
605	md, err := table.Update(ctx, tm, "")
606	if err != nil {
607		t.Fatal(err)
608	}
609	if got, want := md.Labels["label"], "value"; got != want {
610		t.Errorf("got %q, want %q", got, want)
611	}
612	tm = TableMetadataToUpdate{}
613	tm.DeleteLabel("label")
614	md, err = table.Update(ctx, tm, "")
615	if err != nil {
616		t.Fatal(err)
617	}
618	if _, ok := md.Labels["label"]; ok {
619		t.Error("label still present after deletion")
620	}
621}
622
623func TestIntegration_Tables(t *testing.T) {
624	if client == nil {
625		t.Skip("Integration tests skipped")
626	}
627	ctx := context.Background()
628	table := newTable(t, schema)
629	defer table.Delete(ctx)
630	wantName := table.FullyQualifiedName()
631
632	// This test is flaky due to eventual consistency.
633	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
634	defer cancel()
635	err := internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
636		// Iterate over tables in the dataset.
637		it := dataset.Tables(ctx)
638		var tableNames []string
639		for {
640			tbl, err := it.Next()
641			if err == iterator.Done {
642				break
643			}
644			if err != nil {
645				return false, err
646			}
647			tableNames = append(tableNames, tbl.FullyQualifiedName())
648		}
649		// Other tests may be running with this dataset, so there might be more
650		// than just our table in the list. So don't try for an exact match; just
651		// make sure that our table is there somewhere.
652		for _, tn := range tableNames {
653			if tn == wantName {
654				return true, nil
655			}
656		}
657		return false, fmt.Errorf("got %v\nwant %s in the list", tableNames, wantName)
658	})
659	if err != nil {
660		t.Fatal(err)
661	}
662}
663
664func TestIntegration_UploadAndRead(t *testing.T) {
665	if client == nil {
666		t.Skip("Integration tests skipped")
667	}
668	ctx := context.Background()
669	table := newTable(t, schema)
670	defer table.Delete(ctx)
671
672	// Populate the table.
673	upl := table.Uploader()
674	var (
675		wantRows  [][]Value
676		saverRows []*ValuesSaver
677	)
678	for i, name := range []string{"a", "b", "c"} {
679		row := []Value{name, []Value{int64(i)}, []Value{true}}
680		wantRows = append(wantRows, row)
681		saverRows = append(saverRows, &ValuesSaver{
682			Schema:   schema,
683			InsertID: name,
684			Row:      row,
685		})
686	}
687	if err := upl.Put(ctx, saverRows); err != nil {
688		t.Fatal(putError(err))
689	}
690
691	// Wait until the data has been uploaded. This can take a few seconds, according
692	// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
693	if err := waitForRow(ctx, table); err != nil {
694		t.Fatal(err)
695	}
696	// Read the table.
697	checkRead(t, "upload", table.Read(ctx), wantRows)
698
699	// Query the table.
700	q := client.Query(fmt.Sprintf("select name, nums, rec from %s", table.TableID))
701	q.DefaultProjectID = dataset.ProjectID
702	q.DefaultDatasetID = dataset.DatasetID
703
704	rit, err := q.Read(ctx)
705	if err != nil {
706		t.Fatal(err)
707	}
708	checkRead(t, "query", rit, wantRows)
709
710	// Query the long way.
711	job1, err := q.Run(ctx)
712	if err != nil {
713		t.Fatal(err)
714	}
715	if job1.LastStatus() == nil {
716		t.Error("no LastStatus")
717	}
718	job2, err := client.JobFromID(ctx, job1.ID())
719	if err != nil {
720		t.Fatal(err)
721	}
722	if job2.LastStatus() == nil {
723		t.Error("no LastStatus")
724	}
725	rit, err = job2.Read(ctx)
726	if err != nil {
727		t.Fatal(err)
728	}
729	checkRead(t, "job.Read", rit, wantRows)
730
731	// Get statistics.
732	jobStatus, err := job2.Status(ctx)
733	if err != nil {
734		t.Fatal(err)
735	}
736	if jobStatus.Statistics == nil {
737		t.Fatal("jobStatus missing statistics")
738	}
739	if _, ok := jobStatus.Statistics.Details.(*QueryStatistics); !ok {
740		t.Errorf("expected QueryStatistics, got %T", jobStatus.Statistics.Details)
741	}
742
743	// Test reading directly into a []Value.
744	valueLists, schema, _, err := readAll(table.Read(ctx))
745	if err != nil {
746		t.Fatal(err)
747	}
748	it := table.Read(ctx)
749	for i, vl := range valueLists {
750		var got []Value
751		if err := it.Next(&got); err != nil {
752			t.Fatal(err)
753		}
754		if !testutil.Equal(it.Schema, schema) {
755			t.Fatalf("got schema %v, want %v", it.Schema, schema)
756		}
757		want := []Value(vl)
758		if !testutil.Equal(got, want) {
759			t.Errorf("%d: got %v, want %v", i, got, want)
760		}
761	}
762
763	// Test reading into a map.
764	it = table.Read(ctx)
765	for _, vl := range valueLists {
766		var vm map[string]Value
767		if err := it.Next(&vm); err != nil {
768			t.Fatal(err)
769		}
770		if got, want := len(vm), len(vl); got != want {
771			t.Fatalf("valueMap len: got %d, want %d", got, want)
772		}
773		// With maps, structs become nested maps.
774		vl[2] = map[string]Value{"bool": vl[2].([]Value)[0]}
775		for i, v := range vl {
776			if got, want := vm[schema[i].Name], v; !testutil.Equal(got, want) {
777				t.Errorf("%d, name=%s: got %#v, want %#v",
778					i, schema[i].Name, got, want)
779			}
780		}
781	}
782
783}
784
785type SubSubTestStruct struct {
786	Integer int64
787}
788
789type SubTestStruct struct {
790	String      string
791	Record      SubSubTestStruct
792	RecordArray []SubSubTestStruct
793}
794
795type TestStruct struct {
796	Name      string
797	Bytes     []byte
798	Integer   int64
799	Float     float64
800	Boolean   bool
801	Timestamp time.Time
802	Date      civil.Date
803	Time      civil.Time
804	DateTime  civil.DateTime
805	Numeric   *big.Rat
806
807	StringArray    []string
808	IntegerArray   []int64
809	FloatArray     []float64
810	BooleanArray   []bool
811	TimestampArray []time.Time
812	DateArray      []civil.Date
813	TimeArray      []civil.Time
814	DateTimeArray  []civil.DateTime
815	NumericArray   []*big.Rat
816
817	Record      SubTestStruct
818	RecordArray []SubTestStruct
819}
820
821// Round times to the microsecond for comparison purposes.
822var roundToMicros = cmp.Transformer("RoundToMicros",
823	func(t time.Time) time.Time { return t.Round(time.Microsecond) })
824
825func TestIntegration_UploadAndReadStructs(t *testing.T) {
826	if client == nil {
827		t.Skip("Integration tests skipped")
828	}
829	schema, err := InferSchema(TestStruct{})
830	if err != nil {
831		t.Fatal(err)
832	}
833
834	ctx := context.Background()
835	table := newTable(t, schema)
836	defer table.Delete(ctx)
837
838	d := civil.Date{Year: 2016, Month: 3, Day: 20}
839	tm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
840	ts := time.Date(2016, 3, 20, 15, 4, 5, 6000, time.UTC)
841	dtm := civil.DateTime{Date: d, Time: tm}
842	d2 := civil.Date{Year: 1994, Month: 5, Day: 15}
843	tm2 := civil.Time{Hour: 1, Minute: 2, Second: 4, Nanosecond: 0}
844	ts2 := time.Date(1994, 5, 15, 1, 2, 4, 0, time.UTC)
845	dtm2 := civil.DateTime{Date: d2, Time: tm2}
846
847	// Populate the table.
848	upl := table.Uploader()
849	want := []*TestStruct{
850		{
851			"a",
852			[]byte("byte"),
853			42,
854			3.14,
855			true,
856			ts,
857			d,
858			tm,
859			dtm,
860			big.NewRat(57, 100),
861			[]string{"a", "b"},
862			[]int64{1, 2},
863			[]float64{1, 1.41},
864			[]bool{true, false},
865			[]time.Time{ts, ts2},
866			[]civil.Date{d, d2},
867			[]civil.Time{tm, tm2},
868			[]civil.DateTime{dtm, dtm2},
869			[]*big.Rat{big.NewRat(1, 2), big.NewRat(3, 5)},
870			SubTestStruct{
871				"string",
872				SubSubTestStruct{24},
873				[]SubSubTestStruct{{1}, {2}},
874			},
875			[]SubTestStruct{
876				{String: "empty"},
877				{
878					"full",
879					SubSubTestStruct{1},
880					[]SubSubTestStruct{{1}, {2}},
881				},
882			},
883		},
884		{
885			Name:      "b",
886			Bytes:     []byte("byte2"),
887			Integer:   24,
888			Float:     4.13,
889			Boolean:   false,
890			Timestamp: ts,
891			Date:      d,
892			Time:      tm,
893			DateTime:  dtm,
894			Numeric:   big.NewRat(4499, 10000),
895		},
896	}
897	var savers []*StructSaver
898	for _, s := range want {
899		savers = append(savers, &StructSaver{Schema: schema, Struct: s})
900	}
901	if err := upl.Put(ctx, savers); err != nil {
902		t.Fatal(putError(err))
903	}
904
905	// Wait until the data has been uploaded. This can take a few seconds, according
906	// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
907	if err := waitForRow(ctx, table); err != nil {
908		t.Fatal(err)
909	}
910
911	// Test iteration with structs.
912	it := table.Read(ctx)
913	var got []*TestStruct
914	for {
915		var g TestStruct
916		err := it.Next(&g)
917		if err == iterator.Done {
918			break
919		}
920		if err != nil {
921			t.Fatal(err)
922		}
923		got = append(got, &g)
924	}
925	sort.Sort(byName(got))
926
927	// BigQuery does not elide nils. It reports an error for nil fields.
928	for i, g := range got {
929		if i >= len(want) {
930			t.Errorf("%d: got %v, past end of want", i, pretty.Value(g))
931		} else if diff := testutil.Diff(g, want[i], roundToMicros); diff != "" {
932			t.Errorf("%d: got=-, want=+:\n%s", i, diff)
933		}
934	}
935}
936
937type byName []*TestStruct
938
939func (b byName) Len() int           { return len(b) }
940func (b byName) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
941func (b byName) Less(i, j int) bool { return b[i].Name < b[j].Name }
942
943func TestIntegration_UploadAndReadNullable(t *testing.T) {
944	if client == nil {
945		t.Skip("Integration tests skipped")
946	}
947	ctm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
948	cdt := civil.DateTime{Date: testDate, Time: ctm}
949	rat := big.NewRat(33, 100)
950	testUploadAndReadNullable(t, testStructNullable{}, make([]Value, len(testStructNullableSchema)))
951	testUploadAndReadNullable(t, testStructNullable{
952		String:    NullString{"x", true},
953		Bytes:     []byte{1, 2, 3},
954		Integer:   NullInt64{1, true},
955		Float:     NullFloat64{2.3, true},
956		Boolean:   NullBool{true, true},
957		Timestamp: NullTimestamp{testTimestamp, true},
958		Date:      NullDate{testDate, true},
959		Time:      NullTime{ctm, true},
960		DateTime:  NullDateTime{cdt, true},
961		Numeric:   rat,
962		Record:    &subNullable{X: NullInt64{4, true}},
963	},
964		[]Value{"x", []byte{1, 2, 3}, int64(1), 2.3, true, testTimestamp, testDate, ctm, cdt, rat, []Value{int64(4)}})
965}
966
967func testUploadAndReadNullable(t *testing.T, ts testStructNullable, wantRow []Value) {
968	ctx := context.Background()
969	table := newTable(t, testStructNullableSchema)
970	defer table.Delete(ctx)
971
972	// Populate the table.
973	upl := table.Uploader()
974	if err := upl.Put(ctx, []*StructSaver{{Schema: testStructNullableSchema, Struct: ts}}); err != nil {
975		t.Fatal(putError(err))
976	}
977	// Wait until the data has been uploaded. This can take a few seconds, according
978	// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
979	if err := waitForRow(ctx, table); err != nil {
980		t.Fatal(err)
981	}
982
983	// Read into a []Value.
984	iter := table.Read(ctx)
985	gotRows, _, _, err := readAll(iter)
986	if err != nil {
987		t.Fatal(err)
988	}
989	if len(gotRows) != 1 {
990		t.Fatalf("got %d rows, want 1", len(gotRows))
991	}
992	if diff := testutil.Diff(gotRows[0], wantRow, roundToMicros); diff != "" {
993		t.Error(diff)
994	}
995
996	// Read into a struct.
997	want := ts
998	var sn testStructNullable
999	it := table.Read(ctx)
1000	if err := it.Next(&sn); err != nil {
1001		t.Fatal(err)
1002	}
1003	if diff := testutil.Diff(sn, want, roundToMicros); diff != "" {
1004		t.Error(diff)
1005	}
1006}
1007
1008func TestIntegration_TableUpdate(t *testing.T) {
1009	if client == nil {
1010		t.Skip("Integration tests skipped")
1011	}
1012	ctx := context.Background()
1013	table := newTable(t, schema)
1014	defer table.Delete(ctx)
1015
1016	// Test Update of non-schema fields.
1017	tm, err := table.Metadata(ctx)
1018	if err != nil {
1019		t.Fatal(err)
1020	}
1021	wantDescription := tm.Description + "more"
1022	wantName := tm.Name + "more"
1023	wantExpiration := tm.ExpirationTime.Add(time.Hour * 24)
1024	got, err := table.Update(ctx, TableMetadataToUpdate{
1025		Description:    wantDescription,
1026		Name:           wantName,
1027		ExpirationTime: wantExpiration,
1028	}, tm.ETag)
1029	if err != nil {
1030		t.Fatal(err)
1031	}
1032	if got.Description != wantDescription {
1033		t.Errorf("Description: got %q, want %q", got.Description, wantDescription)
1034	}
1035	if got.Name != wantName {
1036		t.Errorf("Name: got %q, want %q", got.Name, wantName)
1037	}
1038	if got.ExpirationTime != wantExpiration {
1039		t.Errorf("ExpirationTime: got %q, want %q", got.ExpirationTime, wantExpiration)
1040	}
1041	if !testutil.Equal(got.Schema, schema) {
1042		t.Errorf("Schema: got %v, want %v", pretty.Value(got.Schema), pretty.Value(schema))
1043	}
1044
1045	// Blind write succeeds.
1046	_, err = table.Update(ctx, TableMetadataToUpdate{Name: "x"}, "")
1047	if err != nil {
1048		t.Fatal(err)
1049	}
1050	// Write with old etag fails.
1051	_, err = table.Update(ctx, TableMetadataToUpdate{Name: "y"}, got.ETag)
1052	if err == nil {
1053		t.Fatal("Update with old ETag succeeded, wanted failure")
1054	}
1055
1056	// Test schema update.
1057	// Columns can be added. schema2 is the same as schema, except for the
1058	// added column in the middle.
1059	nested := Schema{
1060		{Name: "nested", Type: BooleanFieldType},
1061		{Name: "other", Type: StringFieldType},
1062	}
1063	schema2 := Schema{
1064		schema[0],
1065		{Name: "rec2", Type: RecordFieldType, Schema: nested},
1066		schema[1],
1067		schema[2],
1068	}
1069
1070	got, err = table.Update(ctx, TableMetadataToUpdate{Schema: schema2}, "")
1071	if err != nil {
1072		t.Fatal(err)
1073	}
1074
1075	// Wherever you add the column, it appears at the end.
1076	schema3 := Schema{schema2[0], schema2[2], schema2[3], schema2[1]}
1077	if !testutil.Equal(got.Schema, schema3) {
1078		t.Errorf("add field:\ngot  %v\nwant %v",
1079			pretty.Value(got.Schema), pretty.Value(schema3))
1080	}
1081
1082	// Updating with the empty schema succeeds, but is a no-op.
1083	got, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema{}}, "")
1084	if err != nil {
1085		t.Fatal(err)
1086	}
1087	if !testutil.Equal(got.Schema, schema3) {
1088		t.Errorf("empty schema:\ngot  %v\nwant %v",
1089			pretty.Value(got.Schema), pretty.Value(schema3))
1090	}
1091
1092	// Error cases when updating schema.
1093	for _, test := range []struct {
1094		desc   string
1095		fields Schema
1096	}{
1097		{"change from optional to required", Schema{
1098			{Name: "name", Type: StringFieldType, Required: true},
1099			schema3[1],
1100			schema3[2],
1101			schema3[3],
1102		}},
1103		{"add a required field", Schema{
1104			schema3[0], schema3[1], schema3[2], schema3[3],
1105			{Name: "req", Type: StringFieldType, Required: true},
1106		}},
1107		{"remove a field", Schema{schema3[0], schema3[1], schema3[2]}},
1108		{"remove a nested field", Schema{
1109			schema3[0], schema3[1], schema3[2],
1110			{Name: "rec2", Type: RecordFieldType, Schema: Schema{nested[0]}}}},
1111		{"remove all nested fields", Schema{
1112			schema3[0], schema3[1], schema3[2],
1113			{Name: "rec2", Type: RecordFieldType, Schema: Schema{}}}},
1114	} {
1115		_, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema(test.fields)}, "")
1116		if err == nil {
1117			t.Errorf("%s: want error, got nil", test.desc)
1118		} else if !hasStatusCode(err, 400) {
1119			t.Errorf("%s: want 400, got %v", test.desc, err)
1120		}
1121	}
1122}
1123
1124func TestIntegration_Load(t *testing.T) {
1125	if client == nil {
1126		t.Skip("Integration tests skipped")
1127	}
1128	ctx := context.Background()
1129	// CSV data can't be loaded into a repeated field, so we use a different schema.
1130	table := newTable(t, Schema{
1131		{Name: "name", Type: StringFieldType},
1132		{Name: "nums", Type: IntegerFieldType},
1133	})
1134	defer table.Delete(ctx)
1135
1136	// Load the table from a reader.
1137	r := strings.NewReader("a,0\nb,1\nc,2\n")
1138	wantRows := [][]Value{
1139		{"a", int64(0)},
1140		{"b", int64(1)},
1141		{"c", int64(2)},
1142	}
1143	rs := NewReaderSource(r)
1144	loader := table.LoaderFrom(rs)
1145	loader.WriteDisposition = WriteTruncate
1146	loader.Labels = map[string]string{"test": "go"}
1147	job, err := loader.Run(ctx)
1148	if err != nil {
1149		t.Fatal(err)
1150	}
1151	if job.LastStatus() == nil {
1152		t.Error("no LastStatus")
1153	}
1154	conf, err := job.Config()
1155	if err != nil {
1156		t.Fatal(err)
1157	}
1158	config, ok := conf.(*LoadConfig)
1159	if !ok {
1160		t.Fatalf("got %T, want LoadConfig", conf)
1161	}
1162	diff := testutil.Diff(config, &loader.LoadConfig,
1163		cmp.AllowUnexported(Table{}),
1164		cmpopts.IgnoreUnexported(Client{}, ReaderSource{}),
1165		// returned schema is at top level, not in the config
1166		cmpopts.IgnoreFields(FileConfig{}, "Schema"))
1167	if diff != "" {
1168		t.Errorf("got=-, want=+:\n%s", diff)
1169	}
1170	if err := wait(ctx, job); err != nil {
1171		t.Fatal(err)
1172	}
1173	checkReadAndTotalRows(t, "reader load", table.Read(ctx), wantRows)
1174
1175}
1176
1177func TestIntegration_DML(t *testing.T) {
1178	if client == nil {
1179		t.Skip("Integration tests skipped")
1180	}
1181	ctx := context.Background()
1182	table := newTable(t, schema)
1183	defer table.Delete(ctx)
1184
1185	sql := fmt.Sprintf(`INSERT %s.%s (name, nums, rec)
1186						VALUES ('a', [0], STRUCT<BOOL>(TRUE)),
1187							   ('b', [1], STRUCT<BOOL>(FALSE)),
1188							   ('c', [2], STRUCT<BOOL>(TRUE))`,
1189		table.DatasetID, table.TableID)
1190	if err := runDML(ctx, sql); err != nil {
1191		t.Fatal(err)
1192	}
1193	wantRows := [][]Value{
1194		{"a", []Value{int64(0)}, []Value{true}},
1195		{"b", []Value{int64(1)}, []Value{false}},
1196		{"c", []Value{int64(2)}, []Value{true}},
1197	}
1198	checkRead(t, "DML", table.Read(ctx), wantRows)
1199}
1200
1201func runDML(ctx context.Context, sql string) error {
1202	// Retry insert; sometimes it fails with INTERNAL.
1203	return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
1204		ri, err := client.Query(sql).Read(ctx)
1205		if err != nil {
1206			if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
1207				return true, err // fail on 4xx
1208			}
1209			return false, err
1210		}
1211		// It is OK to try to iterate over DML results. The first call to Next
1212		// will return iterator.Done.
1213		err = ri.Next(nil)
1214		if err == nil {
1215			return true, errors.New("want iterator.Done on the first call, got nil")
1216		}
1217		if err == iterator.Done {
1218			return true, nil
1219		}
1220		if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
1221			return true, err // fail on 4xx
1222		}
1223		return false, err
1224	})
1225}
1226
1227func TestIntegration_TimeTypes(t *testing.T) {
1228	if client == nil {
1229		t.Skip("Integration tests skipped")
1230	}
1231	ctx := context.Background()
1232	dtSchema := Schema{
1233		{Name: "d", Type: DateFieldType},
1234		{Name: "t", Type: TimeFieldType},
1235		{Name: "dt", Type: DateTimeFieldType},
1236		{Name: "ts", Type: TimestampFieldType},
1237	}
1238	table := newTable(t, dtSchema)
1239	defer table.Delete(ctx)
1240
1241	d := civil.Date{Year: 2016, Month: 3, Day: 20}
1242	tm := civil.Time{Hour: 12, Minute: 30, Second: 0, Nanosecond: 6000}
1243	dtm := civil.DateTime{Date: d, Time: tm}
1244	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1245	wantRows := [][]Value{
1246		{d, tm, dtm, ts},
1247	}
1248	upl := table.Uploader()
1249	if err := upl.Put(ctx, []*ValuesSaver{
1250		{Schema: dtSchema, Row: wantRows[0]},
1251	}); err != nil {
1252		t.Fatal(putError(err))
1253	}
1254	if err := waitForRow(ctx, table); err != nil {
1255		t.Fatal(err)
1256	}
1257
1258	// SQL wants DATETIMEs with a space between date and time, but the service
1259	// returns them in RFC3339 form, with a "T" between.
1260	query := fmt.Sprintf("INSERT %s.%s (d, t, dt, ts) "+
1261		"VALUES ('%s', '%s', '%s', '%s')",
1262		table.DatasetID, table.TableID,
1263		d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05"))
1264	if err := runDML(ctx, query); err != nil {
1265		t.Fatal(err)
1266	}
1267	wantRows = append(wantRows, wantRows[0])
1268	checkRead(t, "TimeTypes", table.Read(ctx), wantRows)
1269}
1270
1271func TestIntegration_StandardQuery(t *testing.T) {
1272	if client == nil {
1273		t.Skip("Integration tests skipped")
1274	}
1275	ctx := context.Background()
1276
1277	d := civil.Date{Year: 2016, Month: 3, Day: 20}
1278	tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 0}
1279	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1280	dtm := ts.Format("2006-01-02 15:04:05")
1281
1282	// Constructs Value slices made up of int64s.
1283	ints := func(args ...int) []Value {
1284		vals := make([]Value, len(args))
1285		for i, arg := range args {
1286			vals[i] = int64(arg)
1287		}
1288		return vals
1289	}
1290
1291	testCases := []struct {
1292		query   string
1293		wantRow []Value
1294	}{
1295		{"SELECT 1", ints(1)},
1296		{"SELECT 1.3", []Value{1.3}},
1297		{"SELECT CAST(1.3  AS NUMERIC)", []Value{big.NewRat(13, 10)}},
1298		{"SELECT NUMERIC '0.25'", []Value{big.NewRat(1, 4)}},
1299		{"SELECT TRUE", []Value{true}},
1300		{"SELECT 'ABC'", []Value{"ABC"}},
1301		{"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}},
1302		{fmt.Sprintf("SELECT TIMESTAMP '%s'", dtm), []Value{ts}},
1303		{fmt.Sprintf("SELECT [TIMESTAMP '%s', TIMESTAMP '%s']", dtm, dtm), []Value{[]Value{ts, ts}}},
1304		{fmt.Sprintf("SELECT ('hello', TIMESTAMP '%s')", dtm), []Value{[]Value{"hello", ts}}},
1305		{fmt.Sprintf("SELECT DATETIME(TIMESTAMP '%s')", dtm), []Value{civil.DateTime{Date: d, Time: tm}}},
1306		{fmt.Sprintf("SELECT DATE(TIMESTAMP '%s')", dtm), []Value{d}},
1307		{fmt.Sprintf("SELECT TIME(TIMESTAMP '%s')", dtm), []Value{tm}},
1308		{"SELECT (1, 2)", []Value{ints(1, 2)}},
1309		{"SELECT [1, 2, 3]", []Value{ints(1, 2, 3)}},
1310		{"SELECT ([1, 2], 3, [4, 5])", []Value{[]Value{ints(1, 2), int64(3), ints(4, 5)}}},
1311		{"SELECT [(1, 2, 3), (4, 5, 6)]", []Value{[]Value{ints(1, 2, 3), ints(4, 5, 6)}}},
1312		{"SELECT [([1, 2, 3], 4), ([5, 6], 7)]", []Value{[]Value{[]Value{ints(1, 2, 3), int64(4)}, []Value{ints(5, 6), int64(7)}}}},
1313		{"SELECT ARRAY(SELECT STRUCT([1, 2]))", []Value{[]Value{[]Value{ints(1, 2)}}}},
1314	}
1315	for _, c := range testCases {
1316		q := client.Query(c.query)
1317		it, err := q.Read(ctx)
1318		if err != nil {
1319			t.Fatal(err)
1320		}
1321		checkRead(t, "StandardQuery", it, [][]Value{c.wantRow})
1322	}
1323}
1324
1325func TestIntegration_LegacyQuery(t *testing.T) {
1326	if client == nil {
1327		t.Skip("Integration tests skipped")
1328	}
1329	ctx := context.Background()
1330
1331	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1332	dtm := ts.Format("2006-01-02 15:04:05")
1333
1334	testCases := []struct {
1335		query   string
1336		wantRow []Value
1337	}{
1338		{"SELECT 1", []Value{int64(1)}},
1339		{"SELECT 1.3", []Value{1.3}},
1340		{"SELECT TRUE", []Value{true}},
1341		{"SELECT 'ABC'", []Value{"ABC"}},
1342		{"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}},
1343		{fmt.Sprintf("SELECT TIMESTAMP('%s')", dtm), []Value{ts}},
1344		{fmt.Sprintf("SELECT DATE(TIMESTAMP('%s'))", dtm), []Value{"2016-03-20"}},
1345		{fmt.Sprintf("SELECT TIME(TIMESTAMP('%s'))", dtm), []Value{"15:04:05"}},
1346	}
1347	for _, c := range testCases {
1348		q := client.Query(c.query)
1349		q.UseLegacySQL = true
1350		it, err := q.Read(ctx)
1351		if err != nil {
1352			t.Fatal(err)
1353		}
1354		checkRead(t, "LegacyQuery", it, [][]Value{c.wantRow})
1355	}
1356}
1357
1358func TestIntegration_QueryParameters(t *testing.T) {
1359	if client == nil {
1360		t.Skip("Integration tests skipped")
1361	}
1362	ctx := context.Background()
1363
1364	d := civil.Date{Year: 2016, Month: 3, Day: 20}
1365	tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 3008}
1366	rtm := tm
1367	rtm.Nanosecond = 3000 // round to microseconds
1368	dtm := civil.DateTime{Date: d, Time: tm}
1369	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1370	rat := big.NewRat(13, 10)
1371
1372	type ss struct {
1373		String string
1374	}
1375
1376	type s struct {
1377		Timestamp      time.Time
1378		StringArray    []string
1379		SubStruct      ss
1380		SubStructArray []ss
1381	}
1382
1383	testCases := []struct {
1384		query      string
1385		parameters []QueryParameter
1386		wantRow    []Value
1387		wantConfig interface{}
1388	}{
1389		{
1390			"SELECT @val",
1391			[]QueryParameter{{"val", 1}},
1392			[]Value{int64(1)},
1393			int64(1),
1394		},
1395		{
1396			"SELECT @val",
1397			[]QueryParameter{{"val", 1.3}},
1398			[]Value{1.3},
1399			1.3,
1400		},
1401		{
1402			"SELECT @val",
1403			[]QueryParameter{{"val", rat}},
1404			[]Value{rat},
1405			rat,
1406		},
1407		{
1408			"SELECT @val",
1409			[]QueryParameter{{"val", true}},
1410			[]Value{true},
1411			true,
1412		},
1413		{
1414			"SELECT @val",
1415			[]QueryParameter{{"val", "ABC"}},
1416			[]Value{"ABC"},
1417			"ABC",
1418		},
1419		{
1420			"SELECT @val",
1421			[]QueryParameter{{"val", []byte("foo")}},
1422			[]Value{[]byte("foo")},
1423			[]byte("foo"),
1424		},
1425		{
1426			"SELECT @val",
1427			[]QueryParameter{{"val", ts}},
1428			[]Value{ts},
1429			ts,
1430		},
1431		{
1432			"SELECT @val",
1433			[]QueryParameter{{"val", []time.Time{ts, ts}}},
1434			[]Value{[]Value{ts, ts}},
1435			[]interface{}{ts, ts},
1436		},
1437		{
1438			"SELECT @val",
1439			[]QueryParameter{{"val", dtm}},
1440			[]Value{civil.DateTime{Date: d, Time: rtm}},
1441			civil.DateTime{Date: d, Time: rtm},
1442		},
1443		{
1444			"SELECT @val",
1445			[]QueryParameter{{"val", d}},
1446			[]Value{d},
1447			d,
1448		},
1449		{
1450			"SELECT @val",
1451			[]QueryParameter{{"val", tm}},
1452			[]Value{rtm},
1453			rtm,
1454		},
1455		{
1456			"SELECT @val",
1457			[]QueryParameter{{"val", s{ts, []string{"a", "b"}, ss{"c"}, []ss{{"d"}, {"e"}}}}},
1458			[]Value{[]Value{ts, []Value{"a", "b"}, []Value{"c"}, []Value{[]Value{"d"}, []Value{"e"}}}},
1459			map[string]interface{}{
1460				"Timestamp":   ts,
1461				"StringArray": []interface{}{"a", "b"},
1462				"SubStruct":   map[string]interface{}{"String": "c"},
1463				"SubStructArray": []interface{}{
1464					map[string]interface{}{"String": "d"},
1465					map[string]interface{}{"String": "e"},
1466				},
1467			},
1468		},
1469		{
1470			"SELECT @val.Timestamp, @val.SubStruct.String",
1471			[]QueryParameter{{"val", s{Timestamp: ts, SubStruct: ss{"a"}}}},
1472			[]Value{ts, "a"},
1473			map[string]interface{}{
1474				"Timestamp":      ts,
1475				"SubStruct":      map[string]interface{}{"String": "a"},
1476				"StringArray":    nil,
1477				"SubStructArray": nil,
1478			},
1479		},
1480	}
1481	for _, c := range testCases {
1482		q := client.Query(c.query)
1483		q.Parameters = c.parameters
1484		job, err := q.Run(ctx)
1485		if err != nil {
1486			t.Fatal(err)
1487		}
1488		if job.LastStatus() == nil {
1489			t.Error("no LastStatus")
1490		}
1491		it, err := job.Read(ctx)
1492		if err != nil {
1493			t.Fatal(err)
1494		}
1495		checkRead(t, "QueryParameters", it, [][]Value{c.wantRow})
1496		config, err := job.Config()
1497		if err != nil {
1498			t.Fatal(err)
1499		}
1500		got := config.(*QueryConfig).Parameters[0].Value
1501		if !testutil.Equal(got, c.wantConfig) {
1502			t.Errorf("param %[1]v (%[1]T): config:\ngot %[2]v (%[2]T)\nwant %[3]v (%[3]T)",
1503				c.parameters[0].Value, got, c.wantConfig)
1504		}
1505	}
1506}
1507
1508func TestIntegration_QueryDryRun(t *testing.T) {
1509	if client == nil {
1510		t.Skip("Integration tests skipped")
1511	}
1512	ctx := context.Background()
1513	q := client.Query("SELECT word from " + stdName + " LIMIT 10")
1514	q.DryRun = true
1515	job, err := q.Run(ctx)
1516	if err != nil {
1517		t.Fatal(err)
1518	}
1519
1520	s := job.LastStatus()
1521	if s.State != Done {
1522		t.Errorf("state is %v, expected Done", s.State)
1523	}
1524	if s.Statistics == nil {
1525		t.Fatal("no statistics")
1526	}
1527	if s.Statistics.Details.(*QueryStatistics).Schema == nil {
1528		t.Fatal("no schema")
1529	}
1530}
1531
1532func TestIntegration_ExtractExternal(t *testing.T) {
1533	// Create a table, extract it to GCS, then query it externally.
1534	if client == nil {
1535		t.Skip("Integration tests skipped")
1536	}
1537	ctx := context.Background()
1538	schema := Schema{
1539		{Name: "name", Type: StringFieldType},
1540		{Name: "num", Type: IntegerFieldType},
1541	}
1542	table := newTable(t, schema)
1543	defer table.Delete(ctx)
1544
1545	// Insert table data.
1546	sql := fmt.Sprintf(`INSERT %s.%s (name, num)
1547		                VALUES ('a', 1), ('b', 2), ('c', 3)`,
1548		table.DatasetID, table.TableID)
1549	if err := runDML(ctx, sql); err != nil {
1550		t.Fatal(err)
1551	}
1552	// Extract to a GCS object as CSV.
1553	bucketName := testutil.ProjID()
1554	objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID)
1555	uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
1556	defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
1557	gr := NewGCSReference(uri)
1558	gr.DestinationFormat = CSV
1559	e := table.ExtractorTo(gr)
1560	job, err := e.Run(ctx)
1561	if err != nil {
1562		t.Fatal(err)
1563	}
1564	conf, err := job.Config()
1565	if err != nil {
1566		t.Fatal(err)
1567	}
1568	config, ok := conf.(*ExtractConfig)
1569	if !ok {
1570		t.Fatalf("got %T, want ExtractConfig", conf)
1571	}
1572	diff := testutil.Diff(config, &e.ExtractConfig,
1573		cmp.AllowUnexported(Table{}),
1574		cmpopts.IgnoreUnexported(Client{}))
1575	if diff != "" {
1576		t.Errorf("got=-, want=+:\n%s", diff)
1577	}
1578	if err := wait(ctx, job); err != nil {
1579		t.Fatal(err)
1580	}
1581
1582	edc := &ExternalDataConfig{
1583		SourceFormat: CSV,
1584		SourceURIs:   []string{uri},
1585		Schema:       schema,
1586		Options:      &CSVOptions{SkipLeadingRows: 1},
1587	}
1588	// Query that CSV file directly.
1589	q := client.Query("SELECT * FROM csv")
1590	q.TableDefinitions = map[string]ExternalData{"csv": edc}
1591	wantRows := [][]Value{
1592		{"a", int64(1)},
1593		{"b", int64(2)},
1594		{"c", int64(3)},
1595	}
1596	iter, err := q.Read(ctx)
1597	if err != nil {
1598		t.Fatal(err)
1599	}
1600	checkReadAndTotalRows(t, "external query", iter, wantRows)
1601
1602	// Make a table pointing to the file, and query it.
1603	// BigQuery does not allow a Table.Read on an external table.
1604	table = dataset.Table(tableIDs.New())
1605	err = table.Create(context.Background(), &TableMetadata{
1606		Schema:             schema,
1607		ExpirationTime:     testTableExpiration,
1608		ExternalDataConfig: edc,
1609	})
1610	if err != nil {
1611		t.Fatal(err)
1612	}
1613	q = client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID))
1614	iter, err = q.Read(ctx)
1615	if err != nil {
1616		t.Fatal(err)
1617	}
1618	checkReadAndTotalRows(t, "external table", iter, wantRows)
1619
1620	// While we're here, check that the table metadata is correct.
1621	md, err := table.Metadata(ctx)
1622	if err != nil {
1623		t.Fatal(err)
1624	}
1625	// One difference: since BigQuery returns the schema as part of the ordinary
1626	// table metadata, it does not populate ExternalDataConfig.Schema.
1627	md.ExternalDataConfig.Schema = md.Schema
1628	if diff := testutil.Diff(md.ExternalDataConfig, edc); diff != "" {
1629		t.Errorf("got=-, want=+\n%s", diff)
1630	}
1631}
1632
1633func TestIntegration_ReadNullIntoStruct(t *testing.T) {
1634	// Reading a null into a struct field should return an error (not panic).
1635	if client == nil {
1636		t.Skip("Integration tests skipped")
1637	}
1638	ctx := context.Background()
1639	table := newTable(t, schema)
1640	defer table.Delete(ctx)
1641
1642	upl := table.Uploader()
1643	row := &ValuesSaver{
1644		Schema: schema,
1645		Row:    []Value{nil, []Value{}, []Value{nil}},
1646	}
1647	if err := upl.Put(ctx, []*ValuesSaver{row}); err != nil {
1648		t.Fatal(putError(err))
1649	}
1650	if err := waitForRow(ctx, table); err != nil {
1651		t.Fatal(err)
1652	}
1653
1654	q := client.Query(fmt.Sprintf("select name from %s", table.TableID))
1655	q.DefaultProjectID = dataset.ProjectID
1656	q.DefaultDatasetID = dataset.DatasetID
1657	it, err := q.Read(ctx)
1658	if err != nil {
1659		t.Fatal(err)
1660	}
1661	type S struct{ Name string }
1662	var s S
1663	if err := it.Next(&s); err == nil {
1664		t.Fatal("got nil, want error")
1665	}
1666}
1667
1668const (
1669	stdName    = "`bigquery-public-data.samples.shakespeare`"
1670	legacyName = "[bigquery-public-data:samples.shakespeare]"
1671)
1672
1673// These tests exploit the fact that the two SQL versions have different syntaxes for
1674// fully-qualified table names.
1675var useLegacySqlTests = []struct {
1676	t           string // name of table
1677	std, legacy bool   // use standard/legacy SQL
1678	err         bool   // do we expect an error?
1679}{
1680	{t: legacyName, std: false, legacy: true, err: false},
1681	{t: legacyName, std: true, legacy: false, err: true},
1682	{t: legacyName, std: false, legacy: false, err: true}, // standard SQL is default
1683	{t: legacyName, std: true, legacy: true, err: true},
1684	{t: stdName, std: false, legacy: true, err: true},
1685	{t: stdName, std: true, legacy: false, err: false},
1686	{t: stdName, std: false, legacy: false, err: false}, // standard SQL is default
1687	{t: stdName, std: true, legacy: true, err: true},
1688}
1689
1690func TestIntegration_QueryUseLegacySQL(t *testing.T) {
1691	// Test the UseLegacySQL and UseStandardSQL options for queries.
1692	if client == nil {
1693		t.Skip("Integration tests skipped")
1694	}
1695	ctx := context.Background()
1696	for _, test := range useLegacySqlTests {
1697		q := client.Query(fmt.Sprintf("select word from %s limit 1", test.t))
1698		q.UseStandardSQL = test.std
1699		q.UseLegacySQL = test.legacy
1700		_, err := q.Read(ctx)
1701		gotErr := err != nil
1702		if gotErr && !test.err {
1703			t.Errorf("%+v:\nunexpected error: %v", test, err)
1704		} else if !gotErr && test.err {
1705			t.Errorf("%+v:\nsucceeded, but want error", test)
1706		}
1707	}
1708}
1709
1710func TestIntegration_TableUseLegacySQL(t *testing.T) {
1711	// Test UseLegacySQL and UseStandardSQL for Table.Create.
1712	if client == nil {
1713		t.Skip("Integration tests skipped")
1714	}
1715	ctx := context.Background()
1716	table := newTable(t, schema)
1717	defer table.Delete(ctx)
1718	for i, test := range useLegacySqlTests {
1719		view := dataset.Table(fmt.Sprintf("t_view_%d", i))
1720		tm := &TableMetadata{
1721			ViewQuery:      fmt.Sprintf("SELECT word from %s", test.t),
1722			UseStandardSQL: test.std,
1723			UseLegacySQL:   test.legacy,
1724		}
1725		err := view.Create(ctx, tm)
1726		gotErr := err != nil
1727		if gotErr && !test.err {
1728			t.Errorf("%+v:\nunexpected error: %v", test, err)
1729		} else if !gotErr && test.err {
1730			t.Errorf("%+v:\nsucceeded, but want error", test)
1731		}
1732		_ = view.Delete(ctx)
1733	}
1734}
1735
1736func TestIntegration_ListJobs(t *testing.T) {
1737	// It's difficult to test the list of jobs, because we can't easily
1738	// control what's in it. Also, there are many jobs in the test project,
1739	// and it takes considerable time to list them all.
1740	if client == nil {
1741		t.Skip("Integration tests skipped")
1742	}
1743	ctx := context.Background()
1744
1745	// About all we can do is list a few jobs.
1746	const max = 20
1747	var jobs []*Job
1748	it := client.Jobs(ctx)
1749	for {
1750		job, err := it.Next()
1751		if err == iterator.Done {
1752			break
1753		}
1754		if err != nil {
1755			t.Fatal(err)
1756		}
1757		jobs = append(jobs, job)
1758		if len(jobs) >= max {
1759			break
1760		}
1761	}
1762	// We expect that there is at least one job in the last few months.
1763	if len(jobs) == 0 {
1764		t.Fatal("did not get any jobs")
1765	}
1766}
1767
1768const tokyo = "asia-northeast1"
1769
1770func TestIntegration_Location(t *testing.T) {
1771	if client == nil {
1772		t.Skip("Integration tests skipped")
1773	}
1774	client.Location = ""
1775	testLocation(t, tokyo)
1776	client.Location = tokyo
1777	defer func() {
1778		client.Location = ""
1779	}()
1780	testLocation(t, "")
1781}
1782
1783func testLocation(t *testing.T, loc string) {
1784	ctx := context.Background()
1785	tokyoDataset := client.Dataset("tokyo")
1786	err := tokyoDataset.Create(ctx, &DatasetMetadata{Location: loc})
1787	if err != nil && !hasStatusCode(err, 409) { // 409 = already exists
1788		t.Fatal(err)
1789	}
1790	md, err := tokyoDataset.Metadata(ctx)
1791	if err != nil {
1792		t.Fatal(err)
1793	}
1794	if md.Location != tokyo {
1795		t.Fatalf("dataset location: got %s, want %s", md.Location, tokyo)
1796	}
1797	table := tokyoDataset.Table(tableIDs.New())
1798	err = table.Create(context.Background(), &TableMetadata{
1799		Schema: Schema{
1800			{Name: "name", Type: StringFieldType},
1801			{Name: "nums", Type: IntegerFieldType},
1802		},
1803		ExpirationTime: testTableExpiration,
1804	})
1805	if err != nil {
1806		t.Fatal(err)
1807	}
1808	defer table.Delete(ctx)
1809	loader := table.LoaderFrom(NewReaderSource(strings.NewReader("a,0\nb,1\nc,2\n")))
1810	loader.Location = loc
1811	job, err := loader.Run(ctx)
1812	if err != nil {
1813		t.Fatal("loader.Run", err)
1814	}
1815	if job.Location() != tokyo {
1816		t.Fatalf("job location: got %s, want %s", job.Location(), tokyo)
1817	}
1818	_, err = client.JobFromID(ctx, job.ID())
1819	if client.Location == "" && err == nil {
1820		t.Error("JobFromID with Tokyo job, no client location: want error, got nil")
1821	}
1822	if client.Location != "" && err != nil {
1823		t.Errorf("JobFromID with Tokyo job, with client location: want nil, got %v", err)
1824	}
1825	_, err = client.JobFromIDLocation(ctx, job.ID(), "US")
1826	if err == nil {
1827		t.Error("JobFromIDLocation with US: want error, got nil")
1828	}
1829	job2, err := client.JobFromIDLocation(ctx, job.ID(), loc)
1830	if loc == tokyo && err != nil {
1831		t.Errorf("loc=tokyo: %v", err)
1832	}
1833	if loc == "" && err == nil {
1834		t.Error("loc empty: got nil, want error")
1835	}
1836	if job2 != nil && (job2.ID() != job.ID() || job2.Location() != tokyo) {
1837		t.Errorf("got id %s loc %s, want id%s loc %s", job2.ID(), job2.Location(), job.ID(), tokyo)
1838	}
1839	if err := wait(ctx, job); err != nil {
1840		t.Fatal(err)
1841	}
1842	// Cancel should succeed even if the job is done.
1843	if err := job.Cancel(ctx); err != nil {
1844		t.Fatal(err)
1845	}
1846
1847	q := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID))
1848	q.Location = loc
1849	iter, err := q.Read(ctx)
1850	if err != nil {
1851		t.Fatal(err)
1852	}
1853	wantRows := [][]Value{
1854		{"a", int64(0)},
1855		{"b", int64(1)},
1856		{"c", int64(2)},
1857	}
1858	checkRead(t, "location", iter, wantRows)
1859
1860	table2 := tokyoDataset.Table(tableIDs.New())
1861	copier := table2.CopierFrom(table)
1862	copier.Location = loc
1863	if _, err := copier.Run(ctx); err != nil {
1864		t.Fatal(err)
1865	}
1866	bucketName := testutil.ProjID()
1867	objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID)
1868	uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
1869	defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
1870	gr := NewGCSReference(uri)
1871	gr.DestinationFormat = CSV
1872	e := table.ExtractorTo(gr)
1873	e.Location = loc
1874	if _, err := e.Run(ctx); err != nil {
1875		t.Fatal(err)
1876	}
1877}
1878
1879func TestIntegration_NumericErrors(t *testing.T) {
1880	// Verify that the service returns an error for a big.Rat that's too large.
1881	if client == nil {
1882		t.Skip("Integration tests skipped")
1883	}
1884	ctx := context.Background()
1885	schema := Schema{{Name: "n", Type: NumericFieldType}}
1886	table := newTable(t, schema)
1887	defer table.Delete(ctx)
1888	tooBigRat := &big.Rat{}
1889	if _, ok := tooBigRat.SetString("1e40"); !ok {
1890		t.Fatal("big.Rat.SetString failed")
1891	}
1892	upl := table.Uploader()
1893	err := upl.Put(ctx, []*ValuesSaver{{Schema: schema, Row: []Value{tooBigRat}}})
1894	if err == nil {
1895		t.Fatal("got nil, want error")
1896	}
1897}
1898
1899func TestIntegration_QueryErrors(t *testing.T) {
1900	// Verify that a bad query returns an appropriate error.
1901	if client == nil {
1902		t.Skip("Integration tests skipped")
1903	}
1904	ctx := context.Background()
1905	q := client.Query("blah blah broken")
1906	_, err := q.Read(ctx)
1907	const want = "invalidQuery"
1908	if !strings.Contains(err.Error(), want) {
1909		t.Fatalf("got %q, want substring %q", err, want)
1910	}
1911}
1912
1913func TestIntegration_Model(t *testing.T) {
1914	// Create an ML model.
1915	if client == nil {
1916		t.Skip("Integration tests skipped")
1917	}
1918	ctx := context.Background()
1919	schema := Schema{
1920		{Name: "input", Type: IntegerFieldType},
1921		{Name: "label", Type: IntegerFieldType},
1922	}
1923	table := newTable(t, schema)
1924	defer table.Delete(ctx)
1925
1926	// Insert table data.
1927	tableName := fmt.Sprintf("%s.%s", table.DatasetID, table.TableID)
1928	sql := fmt.Sprintf(`INSERT %s (input, label)
1929		                VALUES (1, 0), (2, 1), (3, 0), (4, 1)`,
1930		tableName)
1931	wantNumRows := 4
1932
1933	if err := runDML(ctx, sql); err != nil {
1934		t.Fatal(err)
1935	}
1936
1937	model := dataset.Table("my_model")
1938	modelName := fmt.Sprintf("%s.%s", model.DatasetID, model.TableID)
1939	sql = fmt.Sprintf(`CREATE MODEL %s OPTIONS (model_type='logistic_reg') AS SELECT input, label FROM %s`,
1940		modelName, tableName)
1941	if err := runDML(ctx, sql); err != nil {
1942		t.Fatal(err)
1943	}
1944	defer model.Delete(ctx)
1945
1946	sql = fmt.Sprintf(`SELECT * FROM ml.PREDICT(MODEL %s, TABLE %s)`, modelName, tableName)
1947	q := client.Query(sql)
1948	ri, err := q.Read(ctx)
1949	if err != nil {
1950		t.Fatal(err)
1951	}
1952	rows, _, _, err := readAll(ri)
1953	if err != nil {
1954		t.Fatal(err)
1955	}
1956	if got := len(rows); got != wantNumRows {
1957		t.Fatalf("got %d rows in prediction table, want %d", got, wantNumRows)
1958	}
1959	iter := dataset.Tables(ctx)
1960	seen := false
1961	for {
1962		tbl, err := iter.Next()
1963		if err == iterator.Done {
1964			break
1965		}
1966		if err != nil {
1967			t.Fatal(err)
1968		}
1969		if tbl.TableID == "my_model" {
1970			seen = true
1971		}
1972	}
1973	if !seen {
1974		t.Fatal("model not listed in dataset")
1975	}
1976	if err := model.Delete(ctx); err != nil {
1977		t.Fatal(err)
1978	}
1979}
1980
1981// Creates a new, temporary table with a unique name and the given schema.
1982func newTable(t *testing.T, s Schema) *Table {
1983	table := dataset.Table(tableIDs.New())
1984	err := table.Create(context.Background(), &TableMetadata{
1985		Schema:         s,
1986		ExpirationTime: testTableExpiration,
1987	})
1988	if err != nil {
1989		t.Fatal(err)
1990	}
1991	return table
1992}
1993
1994func checkRead(t *testing.T, msg string, it *RowIterator, want [][]Value) {
1995	if msg2, ok := compareRead(it, want, false); !ok {
1996		t.Errorf("%s: %s", msg, msg2)
1997	}
1998}
1999
2000func checkReadAndTotalRows(t *testing.T, msg string, it *RowIterator, want [][]Value) {
2001	if msg2, ok := compareRead(it, want, true); !ok {
2002		t.Errorf("%s: %s", msg, msg2)
2003	}
2004}
2005
2006func compareRead(it *RowIterator, want [][]Value, compareTotalRows bool) (msg string, ok bool) {
2007	got, _, totalRows, err := readAll(it)
2008	if err != nil {
2009		return err.Error(), false
2010	}
2011	if len(got) != len(want) {
2012		return fmt.Sprintf("got %d rows, want %d", len(got), len(want)), false
2013	}
2014	if compareTotalRows && len(got) != int(totalRows) {
2015		return fmt.Sprintf("got %d rows, but totalRows = %d", len(got), totalRows), false
2016	}
2017	sort.Sort(byCol0(got))
2018	for i, r := range got {
2019		gotRow := []Value(r)
2020		wantRow := want[i]
2021		if !testutil.Equal(gotRow, wantRow) {
2022			return fmt.Sprintf("#%d: got %#v, want %#v", i, gotRow, wantRow), false
2023		}
2024	}
2025	return "", true
2026}
2027
2028func readAll(it *RowIterator) ([][]Value, Schema, uint64, error) {
2029	var (
2030		rows      [][]Value
2031		schema    Schema
2032		totalRows uint64
2033	)
2034	for {
2035		var vals []Value
2036		err := it.Next(&vals)
2037		if err == iterator.Done {
2038			return rows, schema, totalRows, nil
2039		}
2040		if err != nil {
2041			return nil, nil, 0, err
2042		}
2043		rows = append(rows, vals)
2044		schema = it.Schema
2045		totalRows = it.TotalRows
2046	}
2047}
2048
2049type byCol0 [][]Value
2050
2051func (b byCol0) Len() int      { return len(b) }
2052func (b byCol0) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
2053func (b byCol0) Less(i, j int) bool {
2054	switch a := b[i][0].(type) {
2055	case string:
2056		return a < b[j][0].(string)
2057	case civil.Date:
2058		return a.Before(b[j][0].(civil.Date))
2059	default:
2060		panic("unknown type")
2061	}
2062}
2063
2064func hasStatusCode(err error, code int) bool {
2065	if e, ok := err.(*googleapi.Error); ok && e.Code == code {
2066		return true
2067	}
2068	return false
2069}
2070
2071// wait polls the job until it is complete or an error is returned.
2072func wait(ctx context.Context, job *Job) error {
2073	status, err := job.Wait(ctx)
2074	if err != nil {
2075		return err
2076	}
2077	if status.Err() != nil {
2078		return fmt.Errorf("job status error: %#v", status.Err())
2079	}
2080	if status.Statistics == nil {
2081		return errors.New("nil Statistics")
2082	}
2083	if status.Statistics.EndTime.IsZero() {
2084		return errors.New("EndTime is zero")
2085	}
2086	if status.Statistics.Details == nil {
2087		return errors.New("nil Statistics.Details")
2088	}
2089	return nil
2090}
2091
2092// waitForRow polls the table until it contains a row.
2093// TODO(jba): use internal.Retry.
2094func waitForRow(ctx context.Context, table *Table) error {
2095	for {
2096		it := table.Read(ctx)
2097		var v []Value
2098		err := it.Next(&v)
2099		if err == nil {
2100			return nil
2101		}
2102		if err != iterator.Done {
2103			return err
2104		}
2105		time.Sleep(1 * time.Second)
2106	}
2107}
2108
2109func putError(err error) string {
2110	pme, ok := err.(PutMultiError)
2111	if !ok {
2112		return err.Error()
2113	}
2114	var msgs []string
2115	for _, err := range pme {
2116		msgs = append(msgs, err.Error())
2117	}
2118	return strings.Join(msgs, "\n")
2119}
2120