1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"time"
22
23	"cloud.google.com/go/internal"
24	"cloud.google.com/go/internal/trace"
25	gax "github.com/googleapis/gax-go/v2"
26	bq "google.golang.org/api/bigquery/v2"
27	"google.golang.org/api/googleapi"
28	"google.golang.org/api/iterator"
29)
30
31// A Job represents an operation which has been submitted to BigQuery for processing.
32type Job struct {
33	c          *Client
34	projectID  string
35	jobID      string
36	location   string
37	email      string
38	config     *bq.JobConfiguration
39	lastStatus *JobStatus
40}
41
42// JobFromID creates a Job which refers to an existing BigQuery job. The job
43// need not have been created by this package. For example, the job may have
44// been created in the BigQuery console.
45//
46// For jobs whose location is other than "US" or "EU", set Client.Location or use
47// JobFromIDLocation.
48func (c *Client) JobFromID(ctx context.Context, id string) (*Job, error) {
49	return c.JobFromIDLocation(ctx, id, c.Location)
50}
51
52// JobFromIDLocation creates a Job which refers to an existing BigQuery job. The job
53// need not have been created by this package (for example, it may have
54// been created in the BigQuery console), but it must exist in the specified location.
55func (c *Client) JobFromIDLocation(ctx context.Context, id, location string) (j *Job, err error) {
56	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.JobFromIDLocation")
57	defer func() { trace.EndSpan(ctx, err) }()
58
59	bqjob, err := c.getJobInternal(ctx, id, location, "configuration", "jobReference", "status", "statistics")
60	if err != nil {
61		return nil, err
62	}
63	return bqToJob(bqjob, c)
64}
65
66// ID returns the job's ID.
67func (j *Job) ID() string {
68	return j.jobID
69}
70
71// Location returns the job's location.
72func (j *Job) Location() string {
73	return j.location
74}
75
76// Email returns the email of the job's creator.
77func (j *Job) Email() string {
78	return j.email
79}
80
81// State is one of a sequence of states that a Job progresses through as it is processed.
82type State int
83
84const (
85	// StateUnspecified is the default JobIterator state.
86	StateUnspecified State = iota
87	// Pending is a state that describes that the job is pending.
88	Pending
89	// Running is a state that describes that the job is running.
90	Running
91	// Done is a state that describes that the job is done.
92	Done
93)
94
95// JobStatus contains the current State of a job, and errors encountered while processing that job.
96type JobStatus struct {
97	State State
98
99	err error
100
101	// All errors encountered during the running of the job.
102	// Not all Errors are fatal, so errors here do not necessarily mean that the job has completed or was unsuccessful.
103	Errors []*Error
104
105	// Statistics about the job.
106	Statistics *JobStatistics
107}
108
109// JobConfig contains configuration information for a job. It is implemented by
110// *CopyConfig, *ExtractConfig, *LoadConfig and *QueryConfig.
111type JobConfig interface {
112	isJobConfig()
113}
114
115func (*CopyConfig) isJobConfig()    {}
116func (*ExtractConfig) isJobConfig() {}
117func (*LoadConfig) isJobConfig()    {}
118func (*QueryConfig) isJobConfig()   {}
119
120// Config returns the configuration information for j.
121func (j *Job) Config() (JobConfig, error) {
122	return bqToJobConfig(j.config, j.c)
123}
124
125// Children returns a job iterator for enumerating child jobs
126// of the current job.  Currently only scripts, a form of query job,
127// will create child jobs.
128func (j *Job) Children(ctx context.Context) *JobIterator {
129	it := j.c.Jobs(ctx)
130	it.ParentJobID = j.ID()
131	return it
132}
133
134func bqToJobConfig(q *bq.JobConfiguration, c *Client) (JobConfig, error) {
135	switch {
136	case q == nil:
137		return nil, nil
138	case q.Copy != nil:
139		return bqToCopyConfig(q, c), nil
140	case q.Extract != nil:
141		return bqToExtractConfig(q, c), nil
142	case q.Load != nil:
143		return bqToLoadConfig(q, c), nil
144	case q.Query != nil:
145		return bqToQueryConfig(q, c)
146	default:
147		return nil, nil
148	}
149}
150
151// JobIDConfig  describes how to create an ID for a job.
152type JobIDConfig struct {
153	// JobID is the ID to use for the job. If empty, a random job ID will be generated.
154	JobID string
155
156	// If AddJobIDSuffix is true, then a random string will be appended to JobID.
157	AddJobIDSuffix bool
158
159	// Location is the location for the job.
160	Location string
161}
162
163// createJobRef creates a JobReference.
164func (j *JobIDConfig) createJobRef(c *Client) *bq.JobReference {
165	// We don't check whether projectID is empty; the server will return an
166	// error when it encounters the resulting JobReference.
167	loc := j.Location
168	if loc == "" { // Use Client.Location as a default.
169		loc = c.Location
170	}
171	jr := &bq.JobReference{ProjectId: c.projectID, Location: loc}
172	if j.JobID == "" {
173		jr.JobId = randomIDFn()
174	} else if j.AddJobIDSuffix {
175		jr.JobId = j.JobID + "-" + randomIDFn()
176	} else {
177		jr.JobId = j.JobID
178	}
179	return jr
180}
181
182// Done reports whether the job has completed.
183// After Done returns true, the Err method will return an error if the job completed unsuccessfully.
184func (s *JobStatus) Done() bool {
185	return s.State == Done
186}
187
188// Err returns the error that caused the job to complete unsuccessfully (if any).
189func (s *JobStatus) Err() error {
190	return s.err
191}
192
193// Status retrieves the current status of the job from BigQuery. It fails if the Status could not be determined.
194func (j *Job) Status(ctx context.Context) (js *JobStatus, err error) {
195	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Status")
196	defer func() { trace.EndSpan(ctx, err) }()
197
198	bqjob, err := j.c.getJobInternal(ctx, j.jobID, j.location, "status", "statistics")
199	if err != nil {
200		return nil, err
201	}
202	if err := j.setStatus(bqjob.Status); err != nil {
203		return nil, err
204	}
205	j.setStatistics(bqjob.Statistics, j.c)
206	return j.lastStatus, nil
207}
208
209// LastStatus returns the most recently retrieved status of the job. The status is
210// retrieved when a new job is created, or when JobFromID or Job.Status is called.
211// Call Job.Status to get the most up-to-date information about a job.
212func (j *Job) LastStatus() *JobStatus {
213	return j.lastStatus
214}
215
216// Cancel requests that a job be cancelled. This method returns without waiting for
217// cancellation to take effect. To check whether the job has terminated, use Job.Status.
218// Cancelled jobs may still incur costs.
219func (j *Job) Cancel(ctx context.Context) error {
220	// Jobs.Cancel returns a job entity, but the only relevant piece of
221	// data it may contain (the status of the job) is unreliable.  From the
222	// docs: "This call will return immediately, and the client will need
223	// to poll for the job status to see if the cancel completed
224	// successfully".  So it would be misleading to return a status.
225	call := j.c.bqs.Jobs.Cancel(j.projectID, j.jobID).
226		Location(j.location).
227		Fields(). // We don't need any of the response data.
228		Context(ctx)
229	setClientHeader(call.Header())
230	return runWithRetry(ctx, func() error {
231		_, err := call.Do()
232		return err
233	})
234}
235
236// Wait blocks until the job or the context is done. It returns the final status
237// of the job.
238// If an error occurs while retrieving the status, Wait returns that error. But
239// Wait returns nil if the status was retrieved successfully, even if
240// status.Err() != nil. So callers must check both errors. See the example.
241func (j *Job) Wait(ctx context.Context) (js *JobStatus, err error) {
242	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Wait")
243	defer func() { trace.EndSpan(ctx, err) }()
244
245	if j.isQuery() {
246		// We can avoid polling for query jobs.
247		if _, _, err := j.waitForQuery(ctx, j.projectID); err != nil {
248			return nil, err
249		}
250		// Note: extra RPC even if you just want to wait for the query to finish.
251		js, err := j.Status(ctx)
252		if err != nil {
253			return nil, err
254		}
255		return js, nil
256	}
257	// Non-query jobs must poll.
258	err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
259		js, err = j.Status(ctx)
260		if err != nil {
261			return true, err
262		}
263		if js.Done() {
264			return true, nil
265		}
266		return false, nil
267	})
268	if err != nil {
269		return nil, err
270	}
271	return js, nil
272}
273
274// Read fetches the results of a query job.
275// If j is not a query job, Read returns an error.
276func (j *Job) Read(ctx context.Context) (ri *RowIterator, err error) {
277	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Read")
278	defer func() { trace.EndSpan(ctx, err) }()
279
280	return j.read(ctx, j.waitForQuery, fetchPage)
281}
282
283func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, string) (Schema, uint64, error), pf pageFetcher) (*RowIterator, error) {
284	if !j.isQuery() {
285		return nil, errors.New("bigquery: cannot read from a non-query job")
286	}
287	destTable := j.config.Query.DestinationTable
288	// The destination table should only be nil if there was a query error.
289	projectID := j.projectID
290	if destTable != nil && projectID != destTable.ProjectId {
291		return nil, fmt.Errorf("bigquery: job project ID is %q, but destination table's is %q", projectID, destTable.ProjectId)
292	}
293	schema, totalRows, err := waitForQuery(ctx, projectID)
294	if err != nil {
295		return nil, err
296	}
297	if destTable == nil {
298		return nil, errors.New("bigquery: query job missing destination table")
299	}
300	dt := bqToTable(destTable, j.c)
301	it := newRowIterator(ctx, dt, pf)
302	it.Schema = schema
303	it.TotalRows = totalRows
304	return it, nil
305}
306
307// waitForQuery waits for the query job to complete and returns its schema. It also
308// returns the total number of rows in the result set.
309func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint64, error) {
310	// Use GetQueryResults only to wait for completion, not to read results.
311	call := j.c.bqs.Jobs.GetQueryResults(projectID, j.jobID).Location(j.location).Context(ctx).MaxResults(0)
312	setClientHeader(call.Header())
313	backoff := gax.Backoff{
314		Initial:    1 * time.Second,
315		Multiplier: 2,
316		Max:        60 * time.Second,
317	}
318	var res *bq.GetQueryResultsResponse
319	err := internal.Retry(ctx, backoff, func() (stop bool, err error) {
320		res, err = call.Do()
321		if err != nil {
322			return !retryableError(err), err
323		}
324		if !res.JobComplete { // GetQueryResults may return early without error; retry.
325			return false, nil
326		}
327		return true, nil
328	})
329	if err != nil {
330		return nil, 0, err
331	}
332	return bqToSchema(res.Schema), res.TotalRows, nil
333}
334
335// JobStatistics contains statistics about a job.
336type JobStatistics struct {
337	CreationTime        time.Time
338	StartTime           time.Time
339	EndTime             time.Time
340	TotalBytesProcessed int64
341
342	Details Statistics
343
344	// NumChildJobs indicates the number of child jobs run as part of a script.
345	NumChildJobs int64
346
347	// ParentJobID indicates the origin job for jobs run as part of a script.
348	ParentJobID string
349
350	// ScriptStatistics includes information run as part of a child job within
351	// a script.
352	ScriptStatistics *ScriptStatistics
353}
354
355// Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics.
356type Statistics interface {
357	implementsStatistics()
358}
359
360// ExtractStatistics contains statistics about an extract job.
361type ExtractStatistics struct {
362	// The number of files per destination URI or URI pattern specified in the
363	// extract configuration. These values will be in the same order as the
364	// URIs specified in the 'destinationUris' field.
365	DestinationURIFileCounts []int64
366}
367
368// LoadStatistics contains statistics about a load job.
369type LoadStatistics struct {
370	// The number of bytes of source data in a load job.
371	InputFileBytes int64
372
373	// The number of source files in a load job.
374	InputFiles int64
375
376	// Size of the loaded data in bytes. Note that while a load job is in the
377	// running state, this value may change.
378	OutputBytes int64
379
380	// The number of rows imported in a load job. Note that while an import job is
381	// in the running state, this value may change.
382	OutputRows int64
383}
384
385// QueryStatistics contains statistics about a query job.
386type QueryStatistics struct {
387	// Billing tier for the job.
388	BillingTier int64
389
390	// Whether the query result was fetched from the query cache.
391	CacheHit bool
392
393	// The type of query statement, if valid.
394	StatementType string
395
396	// Total bytes billed for the job.
397	TotalBytesBilled int64
398
399	// Total bytes processed for the job.
400	TotalBytesProcessed int64
401
402	// For dry run queries, indicates how accurate the TotalBytesProcessed value is.
403	// When indicated, values include:
404	// UNKNOWN: accuracy of the estimate is unknown.
405	// PRECISE: estimate is precise.
406	// LOWER_BOUND: estimate is lower bound of what the query would cost.
407	// UPPER_BOUND: estiamte is upper bound of what the query would cost.
408	TotalBytesProcessedAccuracy string
409
410	// Describes execution plan for the query.
411	QueryPlan []*ExplainQueryStage
412
413	// The number of rows affected by a DML statement. Present only for DML
414	// statements INSERT, UPDATE or DELETE.
415	NumDMLAffectedRows int64
416
417	// Describes a timeline of job execution.
418	Timeline []*QueryTimelineSample
419
420	// ReferencedTables: [Output-only] Referenced tables for
421	// the job. Queries that reference more than 50 tables will not have a
422	// complete list.
423	ReferencedTables []*Table
424
425	// The schema of the results. Present only for successful dry run of
426	// non-legacy SQL queries.
427	Schema Schema
428
429	// Slot-milliseconds consumed by this query job.
430	SlotMillis int64
431
432	// Standard SQL: list of undeclared query parameter names detected during a
433	// dry run validation.
434	UndeclaredQueryParameterNames []string
435
436	// DDL target table.
437	DDLTargetTable *Table
438
439	// DDL Operation performed on the target table.  Used to report how the
440	// query impacted the DDL target table.
441	DDLOperationPerformed string
442
443	// The DDL target table, present only for CREATE/DROP FUNCTION/PROCEDURE queries.
444	DDLTargetRoutine *Routine
445}
446
447// ExplainQueryStage describes one stage of a query.
448type ExplainQueryStage struct {
449	// CompletedParallelInputs: Number of parallel input segments completed.
450	CompletedParallelInputs int64
451
452	// ComputeAvg: Duration the average shard spent on CPU-bound tasks.
453	ComputeAvg time.Duration
454
455	// ComputeMax: Duration the slowest shard spent on CPU-bound tasks.
456	ComputeMax time.Duration
457
458	// Relative amount of the total time the average shard spent on CPU-bound tasks.
459	ComputeRatioAvg float64
460
461	// Relative amount of the total time the slowest shard spent on CPU-bound tasks.
462	ComputeRatioMax float64
463
464	// EndTime: Stage end time.
465	EndTime time.Time
466
467	// Unique ID for stage within plan.
468	ID int64
469
470	// InputStages: IDs for stages that are inputs to this stage.
471	InputStages []int64
472
473	// Human-readable name for stage.
474	Name string
475
476	// ParallelInputs: Number of parallel input segments to be processed.
477	ParallelInputs int64
478
479	// ReadAvg: Duration the average shard spent reading input.
480	ReadAvg time.Duration
481
482	// ReadMax: Duration the slowest shard spent reading input.
483	ReadMax time.Duration
484
485	// Relative amount of the total time the average shard spent reading input.
486	ReadRatioAvg float64
487
488	// Relative amount of the total time the slowest shard spent reading input.
489	ReadRatioMax float64
490
491	// Number of records read into the stage.
492	RecordsRead int64
493
494	// Number of records written by the stage.
495	RecordsWritten int64
496
497	// ShuffleOutputBytes: Total number of bytes written to shuffle.
498	ShuffleOutputBytes int64
499
500	// ShuffleOutputBytesSpilled: Total number of bytes written to shuffle
501	// and spilled to disk.
502	ShuffleOutputBytesSpilled int64
503
504	// StartTime: Stage start time.
505	StartTime time.Time
506
507	// Current status for the stage.
508	Status string
509
510	// List of operations within the stage in dependency order (approximately
511	// chronological).
512	Steps []*ExplainQueryStep
513
514	// WaitAvg: Duration the average shard spent waiting to be scheduled.
515	WaitAvg time.Duration
516
517	// WaitMax: Duration the slowest shard spent waiting to be scheduled.
518	WaitMax time.Duration
519
520	// Relative amount of the total time the average shard spent waiting to be scheduled.
521	WaitRatioAvg float64
522
523	// Relative amount of the total time the slowest shard spent waiting to be scheduled.
524	WaitRatioMax float64
525
526	// WriteAvg: Duration the average shard spent on writing output.
527	WriteAvg time.Duration
528
529	// WriteMax: Duration the slowest shard spent on writing output.
530	WriteMax time.Duration
531
532	// Relative amount of the total time the average shard spent on writing output.
533	WriteRatioAvg float64
534
535	// Relative amount of the total time the slowest shard spent on writing output.
536	WriteRatioMax float64
537}
538
539// ExplainQueryStep describes one step of a query stage.
540type ExplainQueryStep struct {
541	// Machine-readable operation type.
542	Kind string
543
544	// Human-readable stage descriptions.
545	Substeps []string
546}
547
548// QueryTimelineSample represents a sample of execution statistics at a point in time.
549type QueryTimelineSample struct {
550
551	// Total number of units currently being processed by workers, represented as largest value since last sample.
552	ActiveUnits int64
553
554	// Total parallel units of work completed by this query.
555	CompletedUnits int64
556
557	// Time elapsed since start of query execution.
558	Elapsed time.Duration
559
560	// Total parallel units of work remaining for the active stages.
561	PendingUnits int64
562
563	// Cumulative slot-milliseconds consumed by the query.
564	SlotMillis int64
565}
566
567// ScriptStatistics report information about script-based query jobs.
568type ScriptStatistics struct {
569	EvaluationKind string
570	StackFrames    []*ScriptStackFrame
571}
572
573func bqToScriptStatistics(bs *bq.ScriptStatistics) *ScriptStatistics {
574	if bs == nil {
575		return nil
576	}
577	ss := &ScriptStatistics{
578		EvaluationKind: bs.EvaluationKind,
579	}
580	for _, f := range bs.StackFrames {
581		ss.StackFrames = append(ss.StackFrames, bqToScriptStackFrame(f))
582	}
583	return ss
584}
585
586// ScriptStackFrame represents the location of the statement/expression being evaluated.
587//
588// Line and column numbers are defined as follows:
589//
590// - Line and column numbers start with one.  That is, line 1 column 1 denotes
591//   the start of the script.
592// - When inside a stored procedure, all line/column numbers are relative
593//   to the procedure body, not the script in which the procedure was defined.
594// - Start/end positions exclude leading/trailing comments and whitespace.
595//   The end position always ends with a ";", when present.
596// - Multi-byte Unicode characters are treated as just one column.
597// - If the original script (or procedure definition) contains TAB characters,
598//   a tab "snaps" the indentation forward to the nearest multiple of 8
599//   characters, plus 1. For example, a TAB on column 1, 2, 3, 4, 5, 6 , or 8
600//   will advance the next character to column 9.  A TAB on column 9, 10, 11,
601//   12, 13, 14, 15, or 16 will advance the next character to column 17.
602type ScriptStackFrame struct {
603	StartLine   int64
604	StartColumn int64
605	EndLine     int64
606	EndColumn   int64
607	// Name of the active procedure.  Empty if in a top-level script.
608	ProcedureID string
609	// Text of the current statement/expression.
610	Text string
611}
612
613func bqToScriptStackFrame(bsf *bq.ScriptStackFrame) *ScriptStackFrame {
614	if bsf == nil {
615		return nil
616	}
617	return &ScriptStackFrame{
618		StartLine:   bsf.StartLine,
619		StartColumn: bsf.StartColumn,
620		EndLine:     bsf.EndLine,
621		EndColumn:   bsf.EndColumn,
622		ProcedureID: bsf.ProcedureId,
623		Text:        bsf.Text,
624	}
625}
626
627func (*ExtractStatistics) implementsStatistics() {}
628func (*LoadStatistics) implementsStatistics()    {}
629func (*QueryStatistics) implementsStatistics()   {}
630
631// Jobs lists jobs within a project.
632func (c *Client) Jobs(ctx context.Context) *JobIterator {
633	it := &JobIterator{
634		ctx:       ctx,
635		c:         c,
636		ProjectID: c.projectID,
637	}
638	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
639		it.fetch,
640		func() int { return len(it.items) },
641		func() interface{} { b := it.items; it.items = nil; return b })
642	return it
643}
644
645// JobIterator iterates over jobs in a project.
646type JobIterator struct {
647	ProjectID       string    // Project ID of the jobs to list. Default is the client's project.
648	AllUsers        bool      // Whether to list jobs owned by all users in the project, or just the current caller.
649	State           State     // List only jobs in the given state. Defaults to all states.
650	MinCreationTime time.Time // List only jobs created after this time.
651	MaxCreationTime time.Time // List only jobs created before this time.
652	ParentJobID     string    // List only jobs that are children of a given scripting job.
653
654	ctx      context.Context
655	c        *Client
656	pageInfo *iterator.PageInfo
657	nextFunc func() error
658	items    []*Job
659}
660
661// PageInfo is a getter for the JobIterator's PageInfo.
662func (it *JobIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
663
664// Next returns the next Job. Its second return value is iterator.Done if
665// there are no more results. Once Next returns Done, all subsequent calls will
666// return Done.
667func (it *JobIterator) Next() (*Job, error) {
668	if err := it.nextFunc(); err != nil {
669		return nil, err
670	}
671	item := it.items[0]
672	it.items = it.items[1:]
673	return item, nil
674}
675
676func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) {
677	var st string
678	switch it.State {
679	case StateUnspecified:
680		st = ""
681	case Pending:
682		st = "pending"
683	case Running:
684		st = "running"
685	case Done:
686		st = "done"
687	default:
688		return "", fmt.Errorf("bigquery: invalid value for JobIterator.State: %d", it.State)
689	}
690
691	req := it.c.bqs.Jobs.List(it.ProjectID).
692		Context(it.ctx).
693		PageToken(pageToken).
694		Projection("full").
695		AllUsers(it.AllUsers)
696	if st != "" {
697		req.StateFilter(st)
698	}
699	if !it.MinCreationTime.IsZero() {
700		req.MinCreationTime(uint64(it.MinCreationTime.UnixNano() / 1e6))
701	}
702	if !it.MaxCreationTime.IsZero() {
703		req.MaxCreationTime(uint64(it.MaxCreationTime.UnixNano() / 1e6))
704	}
705	setClientHeader(req.Header())
706	if pageSize > 0 {
707		req.MaxResults(int64(pageSize))
708	}
709	if it.ParentJobID != "" {
710		req.ParentJobId(it.ParentJobID)
711	}
712	res, err := req.Do()
713	if err != nil {
714		return "", err
715	}
716	for _, j := range res.Jobs {
717		job, err := convertListedJob(j, it.c)
718		if err != nil {
719			return "", err
720		}
721		it.items = append(it.items, job)
722	}
723	return res.NextPageToken, nil
724}
725
726func convertListedJob(j *bq.JobListJobs, c *Client) (*Job, error) {
727	return bqToJob2(j.JobReference, j.Configuration, j.Status, j.Statistics, j.UserEmail, c)
728}
729
730func (c *Client) getJobInternal(ctx context.Context, jobID, location string, fields ...googleapi.Field) (*bq.Job, error) {
731	var job *bq.Job
732	call := c.bqs.Jobs.Get(c.projectID, jobID).Context(ctx)
733	if location != "" {
734		call = call.Location(location)
735	}
736	if len(fields) > 0 {
737		call = call.Fields(fields...)
738	}
739	setClientHeader(call.Header())
740	err := runWithRetry(ctx, func() (err error) {
741		job, err = call.Do()
742		return err
743	})
744	if err != nil {
745		return nil, err
746	}
747	return job, nil
748}
749
750func bqToJob(q *bq.Job, c *Client) (*Job, error) {
751	return bqToJob2(q.JobReference, q.Configuration, q.Status, q.Statistics, q.UserEmail, c)
752}
753
754func bqToJob2(qr *bq.JobReference, qc *bq.JobConfiguration, qs *bq.JobStatus, qt *bq.JobStatistics, email string, c *Client) (*Job, error) {
755	j := &Job{
756		projectID: qr.ProjectId,
757		jobID:     qr.JobId,
758		location:  qr.Location,
759		c:         c,
760		email:     email,
761	}
762	j.setConfig(qc)
763	if err := j.setStatus(qs); err != nil {
764		return nil, err
765	}
766	j.setStatistics(qt, c)
767	return j, nil
768}
769
770func (j *Job) setConfig(config *bq.JobConfiguration) {
771	if config == nil {
772		return
773	}
774	j.config = config
775}
776
777func (j *Job) isQuery() bool {
778	return j.config != nil && j.config.Query != nil
779}
780
781var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done}
782
783func (j *Job) setStatus(qs *bq.JobStatus) error {
784	if qs == nil {
785		return nil
786	}
787	state, ok := stateMap[qs.State]
788	if !ok {
789		return fmt.Errorf("unexpected job state: %v", qs.State)
790	}
791	j.lastStatus = &JobStatus{
792		State: state,
793		err:   nil,
794	}
795	if err := bqToError(qs.ErrorResult); state == Done && err != nil {
796		j.lastStatus.err = err
797	}
798	for _, ep := range qs.Errors {
799		j.lastStatus.Errors = append(j.lastStatus.Errors, bqToError(ep))
800	}
801	return nil
802}
803
804func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
805	if s == nil || j.lastStatus == nil {
806		return
807	}
808	js := &JobStatistics{
809		CreationTime:        unixMillisToTime(s.CreationTime),
810		StartTime:           unixMillisToTime(s.StartTime),
811		EndTime:             unixMillisToTime(s.EndTime),
812		TotalBytesProcessed: s.TotalBytesProcessed,
813		NumChildJobs:        s.NumChildJobs,
814		ParentJobID:         s.ParentJobId,
815		ScriptStatistics:    bqToScriptStatistics(s.ScriptStatistics),
816	}
817	switch {
818	case s.Extract != nil:
819		js.Details = &ExtractStatistics{
820			DestinationURIFileCounts: []int64(s.Extract.DestinationUriFileCounts),
821		}
822	case s.Load != nil:
823		js.Details = &LoadStatistics{
824			InputFileBytes: s.Load.InputFileBytes,
825			InputFiles:     s.Load.InputFiles,
826			OutputBytes:    s.Load.OutputBytes,
827			OutputRows:     s.Load.OutputRows,
828		}
829	case s.Query != nil:
830		var names []string
831		for _, qp := range s.Query.UndeclaredQueryParameters {
832			names = append(names, qp.Name)
833		}
834		var tables []*Table
835		for _, tr := range s.Query.ReferencedTables {
836			tables = append(tables, bqToTable(tr, c))
837		}
838		js.Details = &QueryStatistics{
839			BillingTier:                   s.Query.BillingTier,
840			CacheHit:                      s.Query.CacheHit,
841			DDLTargetTable:                bqToTable(s.Query.DdlTargetTable, c),
842			DDLOperationPerformed:         s.Query.DdlOperationPerformed,
843			DDLTargetRoutine:              bqToRoutine(s.Query.DdlTargetRoutine, c),
844			StatementType:                 s.Query.StatementType,
845			TotalBytesBilled:              s.Query.TotalBytesBilled,
846			TotalBytesProcessed:           s.Query.TotalBytesProcessed,
847			TotalBytesProcessedAccuracy:   s.Query.TotalBytesProcessedAccuracy,
848			NumDMLAffectedRows:            s.Query.NumDmlAffectedRows,
849			QueryPlan:                     queryPlanFromProto(s.Query.QueryPlan),
850			Schema:                        bqToSchema(s.Query.Schema),
851			SlotMillis:                    s.Query.TotalSlotMs,
852			Timeline:                      timelineFromProto(s.Query.Timeline),
853			ReferencedTables:              tables,
854			UndeclaredQueryParameterNames: names,
855		}
856	}
857	j.lastStatus.Statistics = js
858}
859
860func queryPlanFromProto(stages []*bq.ExplainQueryStage) []*ExplainQueryStage {
861	var res []*ExplainQueryStage
862	for _, s := range stages {
863		var steps []*ExplainQueryStep
864		for _, p := range s.Steps {
865			steps = append(steps, &ExplainQueryStep{
866				Kind:     p.Kind,
867				Substeps: p.Substeps,
868			})
869		}
870		res = append(res, &ExplainQueryStage{
871			CompletedParallelInputs:   s.CompletedParallelInputs,
872			ComputeAvg:                time.Duration(s.ComputeMsAvg) * time.Millisecond,
873			ComputeMax:                time.Duration(s.ComputeMsMax) * time.Millisecond,
874			ComputeRatioAvg:           s.ComputeRatioAvg,
875			ComputeRatioMax:           s.ComputeRatioMax,
876			EndTime:                   time.Unix(0, s.EndMs*1e6),
877			ID:                        s.Id,
878			InputStages:               s.InputStages,
879			Name:                      s.Name,
880			ParallelInputs:            s.ParallelInputs,
881			ReadAvg:                   time.Duration(s.ReadMsAvg) * time.Millisecond,
882			ReadMax:                   time.Duration(s.ReadMsMax) * time.Millisecond,
883			ReadRatioAvg:              s.ReadRatioAvg,
884			ReadRatioMax:              s.ReadRatioMax,
885			RecordsRead:               s.RecordsRead,
886			RecordsWritten:            s.RecordsWritten,
887			ShuffleOutputBytes:        s.ShuffleOutputBytes,
888			ShuffleOutputBytesSpilled: s.ShuffleOutputBytesSpilled,
889			StartTime:                 time.Unix(0, s.StartMs*1e6),
890			Status:                    s.Status,
891			Steps:                     steps,
892			WaitAvg:                   time.Duration(s.WaitMsAvg) * time.Millisecond,
893			WaitMax:                   time.Duration(s.WaitMsMax) * time.Millisecond,
894			WaitRatioAvg:              s.WaitRatioAvg,
895			WaitRatioMax:              s.WaitRatioMax,
896			WriteAvg:                  time.Duration(s.WriteMsAvg) * time.Millisecond,
897			WriteMax:                  time.Duration(s.WriteMsMax) * time.Millisecond,
898			WriteRatioAvg:             s.WriteRatioAvg,
899			WriteRatioMax:             s.WriteRatioMax,
900		})
901	}
902	return res
903}
904
905func timelineFromProto(timeline []*bq.QueryTimelineSample) []*QueryTimelineSample {
906	var res []*QueryTimelineSample
907	for _, s := range timeline {
908		res = append(res, &QueryTimelineSample{
909			ActiveUnits:    s.ActiveUnits,
910			CompletedUnits: s.CompletedUnits,
911			Elapsed:        time.Duration(s.ElapsedMs) * time.Millisecond,
912			PendingUnits:   s.PendingUnits,
913			SlotMillis:     s.TotalSlotMs,
914		})
915	}
916	return res
917}
918