1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"time"
22
23	"cloud.google.com/go/internal"
24	"cloud.google.com/go/internal/trace"
25	gax "github.com/googleapis/gax-go/v2"
26	bq "google.golang.org/api/bigquery/v2"
27	"google.golang.org/api/googleapi"
28	"google.golang.org/api/iterator"
29)
30
31// A Job represents an operation which has been submitted to BigQuery for processing.
32type Job struct {
33	c          *Client
34	projectID  string
35	jobID      string
36	location   string
37	email      string
38	config     *bq.JobConfiguration
39	lastStatus *JobStatus
40}
41
42// JobFromID creates a Job which refers to an existing BigQuery job. The job
43// need not have been created by this package. For example, the job may have
44// been created in the BigQuery console.
45//
46// For jobs whose location is other than "US" or "EU", set Client.Location or use
47// JobFromIDLocation.
48func (c *Client) JobFromID(ctx context.Context, id string) (*Job, error) {
49	return c.JobFromIDLocation(ctx, id, c.Location)
50}
51
52// JobFromIDLocation creates a Job which refers to an existing BigQuery job. The job
53// need not have been created by this package (for example, it may have
54// been created in the BigQuery console), but it must exist in the specified location.
55func (c *Client) JobFromIDLocation(ctx context.Context, id, location string) (j *Job, err error) {
56	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.JobFromIDLocation")
57	defer func() { trace.EndSpan(ctx, err) }()
58
59	bqjob, err := c.getJobInternal(ctx, id, location, "configuration", "jobReference", "status", "statistics")
60	if err != nil {
61		return nil, err
62	}
63	return bqToJob(bqjob, c)
64}
65
66// ProjectID returns the job's associated project.
67func (j *Job) ProjectID() string {
68	return j.projectID
69}
70
71// ID returns the job's ID.
72func (j *Job) ID() string {
73	return j.jobID
74}
75
76// Location returns the job's location.
77func (j *Job) Location() string {
78	return j.location
79}
80
81// Email returns the email of the job's creator.
82func (j *Job) Email() string {
83	return j.email
84}
85
86// State is one of a sequence of states that a Job progresses through as it is processed.
87type State int
88
89const (
90	// StateUnspecified is the default JobIterator state.
91	StateUnspecified State = iota
92	// Pending is a state that describes that the job is pending.
93	Pending
94	// Running is a state that describes that the job is running.
95	Running
96	// Done is a state that describes that the job is done.
97	Done
98)
99
100// JobStatus contains the current State of a job, and errors encountered while processing that job.
101type JobStatus struct {
102	State State
103
104	err error
105
106	// All errors encountered during the running of the job.
107	// Not all Errors are fatal, so errors here do not necessarily mean that the job has completed or was unsuccessful.
108	Errors []*Error
109
110	// Statistics about the job.
111	Statistics *JobStatistics
112}
113
114// JobConfig contains configuration information for a job. It is implemented by
115// *CopyConfig, *ExtractConfig, *LoadConfig and *QueryConfig.
116type JobConfig interface {
117	isJobConfig()
118}
119
120func (*CopyConfig) isJobConfig()    {}
121func (*ExtractConfig) isJobConfig() {}
122func (*LoadConfig) isJobConfig()    {}
123func (*QueryConfig) isJobConfig()   {}
124
125// Config returns the configuration information for j.
126func (j *Job) Config() (JobConfig, error) {
127	return bqToJobConfig(j.config, j.c)
128}
129
130// Children returns a job iterator for enumerating child jobs
131// of the current job.  Currently only scripts, a form of query job,
132// will create child jobs.
133func (j *Job) Children(ctx context.Context) *JobIterator {
134	it := j.c.Jobs(ctx)
135	it.ParentJobID = j.ID()
136	return it
137}
138
139func bqToJobConfig(q *bq.JobConfiguration, c *Client) (JobConfig, error) {
140	switch {
141	case q == nil:
142		return nil, nil
143	case q.Copy != nil:
144		return bqToCopyConfig(q, c), nil
145	case q.Extract != nil:
146		return bqToExtractConfig(q, c), nil
147	case q.Load != nil:
148		return bqToLoadConfig(q, c), nil
149	case q.Query != nil:
150		return bqToQueryConfig(q, c)
151	default:
152		return nil, nil
153	}
154}
155
156// JobIDConfig  describes how to create an ID for a job.
157type JobIDConfig struct {
158	// JobID is the ID to use for the job. If empty, a random job ID will be generated.
159	JobID string
160
161	// If AddJobIDSuffix is true, then a random string will be appended to JobID.
162	AddJobIDSuffix bool
163
164	// Location is the location for the job.
165	Location string
166}
167
168// createJobRef creates a JobReference.
169func (j *JobIDConfig) createJobRef(c *Client) *bq.JobReference {
170	// We don't check whether projectID is empty; the server will return an
171	// error when it encounters the resulting JobReference.
172	loc := j.Location
173	if loc == "" { // Use Client.Location as a default.
174		loc = c.Location
175	}
176	jr := &bq.JobReference{ProjectId: c.projectID, Location: loc}
177	if j.JobID == "" {
178		jr.JobId = randomIDFn()
179	} else if j.AddJobIDSuffix {
180		jr.JobId = j.JobID + "-" + randomIDFn()
181	} else {
182		jr.JobId = j.JobID
183	}
184	return jr
185}
186
187// Done reports whether the job has completed.
188// After Done returns true, the Err method will return an error if the job completed unsuccessfully.
189func (s *JobStatus) Done() bool {
190	return s.State == Done
191}
192
193// Err returns the error that caused the job to complete unsuccessfully (if any).
194func (s *JobStatus) Err() error {
195	return s.err
196}
197
198// Status retrieves the current status of the job from BigQuery. It fails if the Status could not be determined.
199func (j *Job) Status(ctx context.Context) (js *JobStatus, err error) {
200	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Status")
201	defer func() { trace.EndSpan(ctx, err) }()
202
203	bqjob, err := j.c.getJobInternal(ctx, j.jobID, j.location, "status", "statistics")
204	if err != nil {
205		return nil, err
206	}
207	if err := j.setStatus(bqjob.Status); err != nil {
208		return nil, err
209	}
210	j.setStatistics(bqjob.Statistics, j.c)
211	return j.lastStatus, nil
212}
213
214// LastStatus returns the most recently retrieved status of the job. The status is
215// retrieved when a new job is created, or when JobFromID or Job.Status is called.
216// Call Job.Status to get the most up-to-date information about a job.
217func (j *Job) LastStatus() *JobStatus {
218	return j.lastStatus
219}
220
221// Cancel requests that a job be cancelled. This method returns without waiting for
222// cancellation to take effect. To check whether the job has terminated, use Job.Status.
223// Cancelled jobs may still incur costs.
224func (j *Job) Cancel(ctx context.Context) error {
225	// Jobs.Cancel returns a job entity, but the only relevant piece of
226	// data it may contain (the status of the job) is unreliable.  From the
227	// docs: "This call will return immediately, and the client will need
228	// to poll for the job status to see if the cancel completed
229	// successfully".  So it would be misleading to return a status.
230	call := j.c.bqs.Jobs.Cancel(j.projectID, j.jobID).
231		Location(j.location).
232		Fields(). // We don't need any of the response data.
233		Context(ctx)
234	setClientHeader(call.Header())
235	return runWithRetry(ctx, func() error {
236		_, err := call.Do()
237		return err
238	})
239}
240
241// Delete deletes the job.
242func (j *Job) Delete(ctx context.Context) (err error) {
243	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Delete")
244	defer func() { trace.EndSpan(ctx, err) }()
245
246	call := j.c.bqs.Jobs.Delete(j.projectID, j.jobID).Context(ctx)
247	if j.location != "" {
248		call = call.Location(j.location)
249	}
250	setClientHeader(call.Header())
251
252	return runWithRetry(ctx, func() (err error) {
253		err = call.Do()
254		return err
255	})
256}
257
258// Wait blocks until the job or the context is done. It returns the final status
259// of the job.
260// If an error occurs while retrieving the status, Wait returns that error. But
261// Wait returns nil if the status was retrieved successfully, even if
262// status.Err() != nil. So callers must check both errors. See the example.
263func (j *Job) Wait(ctx context.Context) (js *JobStatus, err error) {
264	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Wait")
265	defer func() { trace.EndSpan(ctx, err) }()
266
267	if j.isQuery() {
268		// We can avoid polling for query jobs.
269		if _, _, err := j.waitForQuery(ctx, j.projectID); err != nil {
270			return nil, err
271		}
272		// Note: extra RPC even if you just want to wait for the query to finish.
273		js, err := j.Status(ctx)
274		if err != nil {
275			return nil, err
276		}
277		return js, nil
278	}
279	// Non-query jobs must poll.
280	err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
281		js, err = j.Status(ctx)
282		if err != nil {
283			return true, err
284		}
285		if js.Done() {
286			return true, nil
287		}
288		return false, nil
289	})
290	if err != nil {
291		return nil, err
292	}
293	return js, nil
294}
295
296// Read fetches the results of a query job.
297// If j is not a query job, Read returns an error.
298func (j *Job) Read(ctx context.Context) (ri *RowIterator, err error) {
299	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Read")
300	defer func() { trace.EndSpan(ctx, err) }()
301
302	return j.read(ctx, j.waitForQuery, fetchPage)
303}
304
305func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, string) (Schema, uint64, error), pf pageFetcher) (*RowIterator, error) {
306	if !j.isQuery() {
307		return nil, errors.New("bigquery: cannot read from a non-query job")
308	}
309	schema, totalRows, err := waitForQuery(ctx, j.projectID)
310	if err != nil {
311		return nil, err
312	}
313	// Shave off some potential overhead by only retaining the minimal job representation in the iterator.
314	itJob := &Job{
315		c:         j.c,
316		projectID: j.projectID,
317		jobID:     j.jobID,
318		location:  j.location,
319	}
320	it := newRowIterator(ctx, &rowSource{j: itJob}, pf)
321	it.Schema = schema
322	it.TotalRows = totalRows
323	return it, nil
324}
325
326// waitForQuery waits for the query job to complete and returns its schema. It also
327// returns the total number of rows in the result set.
328func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint64, error) {
329	// Use GetQueryResults only to wait for completion, not to read results.
330	call := j.c.bqs.Jobs.GetQueryResults(projectID, j.jobID).Location(j.location).Context(ctx).MaxResults(0)
331	setClientHeader(call.Header())
332	backoff := gax.Backoff{
333		Initial:    1 * time.Second,
334		Multiplier: 2,
335		Max:        60 * time.Second,
336	}
337	var res *bq.GetQueryResultsResponse
338	err := internal.Retry(ctx, backoff, func() (stop bool, err error) {
339		res, err = call.Do()
340		if err != nil {
341			return !retryableError(err), err
342		}
343		if !res.JobComplete { // GetQueryResults may return early without error; retry.
344			return false, nil
345		}
346		return true, nil
347	})
348	if err != nil {
349		return nil, 0, err
350	}
351	return bqToSchema(res.Schema), res.TotalRows, nil
352}
353
354// JobStatistics contains statistics about a job.
355type JobStatistics struct {
356	CreationTime        time.Time
357	StartTime           time.Time
358	EndTime             time.Time
359	TotalBytesProcessed int64
360
361	Details Statistics
362
363	// NumChildJobs indicates the number of child jobs run as part of a script.
364	NumChildJobs int64
365
366	// ParentJobID indicates the origin job for jobs run as part of a script.
367	ParentJobID string
368
369	// ScriptStatistics includes information run as part of a child job within
370	// a script.
371	ScriptStatistics *ScriptStatistics
372
373	// ReservationUsage attributes slot consumption to reservations.
374	ReservationUsage []*ReservationUsage
375
376	// TransactionInfo indicates the transaction ID associated with the job, if any.
377	TransactionInfo *TransactionInfo
378}
379
380// Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics.
381type Statistics interface {
382	implementsStatistics()
383}
384
385// ExtractStatistics contains statistics about an extract job.
386type ExtractStatistics struct {
387	// The number of files per destination URI or URI pattern specified in the
388	// extract configuration. These values will be in the same order as the
389	// URIs specified in the 'destinationUris' field.
390	DestinationURIFileCounts []int64
391}
392
393// LoadStatistics contains statistics about a load job.
394type LoadStatistics struct {
395	// The number of bytes of source data in a load job.
396	InputFileBytes int64
397
398	// The number of source files in a load job.
399	InputFiles int64
400
401	// Size of the loaded data in bytes. Note that while a load job is in the
402	// running state, this value may change.
403	OutputBytes int64
404
405	// The number of rows imported in a load job. Note that while an import job is
406	// in the running state, this value may change.
407	OutputRows int64
408}
409
410// QueryStatistics contains statistics about a query job.
411type QueryStatistics struct {
412	// Billing tier for the job.
413	BillingTier int64
414
415	// Whether the query result was fetched from the query cache.
416	CacheHit bool
417
418	// The type of query statement, if valid.
419	StatementType string
420
421	// Total bytes billed for the job.
422	TotalBytesBilled int64
423
424	// Total bytes processed for the job.
425	TotalBytesProcessed int64
426
427	// For dry run queries, indicates how accurate the TotalBytesProcessed value is.
428	// When indicated, values include:
429	// UNKNOWN: accuracy of the estimate is unknown.
430	// PRECISE: estimate is precise.
431	// LOWER_BOUND: estimate is lower bound of what the query would cost.
432	// UPPER_BOUND: estimate is upper bound of what the query would cost.
433	TotalBytesProcessedAccuracy string
434
435	// Describes execution plan for the query.
436	QueryPlan []*ExplainQueryStage
437
438	// The number of rows affected by a DML statement. Present only for DML
439	// statements INSERT, UPDATE or DELETE.
440	NumDMLAffectedRows int64
441
442	// DMLStats provides statistics about the row mutations performed by
443	// DML statements.
444	DMLStats *DMLStatistics
445
446	// Describes a timeline of job execution.
447	Timeline []*QueryTimelineSample
448
449	// ReferencedTables: [Output-only] Referenced tables for
450	// the job. Queries that reference more than 50 tables will not have a
451	// complete list.
452	ReferencedTables []*Table
453
454	// The schema of the results. Present only for successful dry run of
455	// non-legacy SQL queries.
456	Schema Schema
457
458	// Slot-milliseconds consumed by this query job.
459	SlotMillis int64
460
461	// Standard SQL: list of undeclared query parameter names detected during a
462	// dry run validation.
463	UndeclaredQueryParameterNames []string
464
465	// DDL target table.
466	DDLTargetTable *Table
467
468	// DDL Operation performed on the target table.  Used to report how the
469	// query impacted the DDL target table.
470	DDLOperationPerformed string
471
472	// The DDL target table, present only for CREATE/DROP FUNCTION/PROCEDURE queries.
473	DDLTargetRoutine *Routine
474}
475
476// ExplainQueryStage describes one stage of a query.
477type ExplainQueryStage struct {
478	// CompletedParallelInputs: Number of parallel input segments completed.
479	CompletedParallelInputs int64
480
481	// ComputeAvg: Duration the average shard spent on CPU-bound tasks.
482	ComputeAvg time.Duration
483
484	// ComputeMax: Duration the slowest shard spent on CPU-bound tasks.
485	ComputeMax time.Duration
486
487	// Relative amount of the total time the average shard spent on CPU-bound tasks.
488	ComputeRatioAvg float64
489
490	// Relative amount of the total time the slowest shard spent on CPU-bound tasks.
491	ComputeRatioMax float64
492
493	// EndTime: Stage end time.
494	EndTime time.Time
495
496	// Unique ID for stage within plan.
497	ID int64
498
499	// InputStages: IDs for stages that are inputs to this stage.
500	InputStages []int64
501
502	// Human-readable name for stage.
503	Name string
504
505	// ParallelInputs: Number of parallel input segments to be processed.
506	ParallelInputs int64
507
508	// ReadAvg: Duration the average shard spent reading input.
509	ReadAvg time.Duration
510
511	// ReadMax: Duration the slowest shard spent reading input.
512	ReadMax time.Duration
513
514	// Relative amount of the total time the average shard spent reading input.
515	ReadRatioAvg float64
516
517	// Relative amount of the total time the slowest shard spent reading input.
518	ReadRatioMax float64
519
520	// Number of records read into the stage.
521	RecordsRead int64
522
523	// Number of records written by the stage.
524	RecordsWritten int64
525
526	// ShuffleOutputBytes: Total number of bytes written to shuffle.
527	ShuffleOutputBytes int64
528
529	// ShuffleOutputBytesSpilled: Total number of bytes written to shuffle
530	// and spilled to disk.
531	ShuffleOutputBytesSpilled int64
532
533	// StartTime: Stage start time.
534	StartTime time.Time
535
536	// Current status for the stage.
537	Status string
538
539	// List of operations within the stage in dependency order (approximately
540	// chronological).
541	Steps []*ExplainQueryStep
542
543	// WaitAvg: Duration the average shard spent waiting to be scheduled.
544	WaitAvg time.Duration
545
546	// WaitMax: Duration the slowest shard spent waiting to be scheduled.
547	WaitMax time.Duration
548
549	// Relative amount of the total time the average shard spent waiting to be scheduled.
550	WaitRatioAvg float64
551
552	// Relative amount of the total time the slowest shard spent waiting to be scheduled.
553	WaitRatioMax float64
554
555	// WriteAvg: Duration the average shard spent on writing output.
556	WriteAvg time.Duration
557
558	// WriteMax: Duration the slowest shard spent on writing output.
559	WriteMax time.Duration
560
561	// Relative amount of the total time the average shard spent on writing output.
562	WriteRatioAvg float64
563
564	// Relative amount of the total time the slowest shard spent on writing output.
565	WriteRatioMax float64
566}
567
568// ExplainQueryStep describes one step of a query stage.
569type ExplainQueryStep struct {
570	// Machine-readable operation type.
571	Kind string
572
573	// Human-readable stage descriptions.
574	Substeps []string
575}
576
577// QueryTimelineSample represents a sample of execution statistics at a point in time.
578type QueryTimelineSample struct {
579
580	// Total number of units currently being processed by workers, represented as largest value since last sample.
581	ActiveUnits int64
582
583	// Total parallel units of work completed by this query.
584	CompletedUnits int64
585
586	// Time elapsed since start of query execution.
587	Elapsed time.Duration
588
589	// Total parallel units of work remaining for the active stages.
590	PendingUnits int64
591
592	// Cumulative slot-milliseconds consumed by the query.
593	SlotMillis int64
594}
595
596// ReservationUsage contains information about a job's usage of a single reservation.
597type ReservationUsage struct {
598	// SlotMillis reports the slot milliseconds utilized within in the given reservation.
599	SlotMillis int64
600	// Name indicates the utilized reservation name, or "unreserved" for ondemand usage.
601	Name string
602}
603
604func bqToReservationUsage(ru []*bq.JobStatisticsReservationUsage) []*ReservationUsage {
605	var usage []*ReservationUsage
606	for _, in := range ru {
607		usage = append(usage, &ReservationUsage{
608			SlotMillis: in.SlotMs,
609			Name:       in.Name,
610		})
611	}
612	return usage
613}
614
615// ScriptStatistics report information about script-based query jobs.
616type ScriptStatistics struct {
617	EvaluationKind string
618	StackFrames    []*ScriptStackFrame
619}
620
621func bqToScriptStatistics(bs *bq.ScriptStatistics) *ScriptStatistics {
622	if bs == nil {
623		return nil
624	}
625	ss := &ScriptStatistics{
626		EvaluationKind: bs.EvaluationKind,
627	}
628	for _, f := range bs.StackFrames {
629		ss.StackFrames = append(ss.StackFrames, bqToScriptStackFrame(f))
630	}
631	return ss
632}
633
634// ScriptStackFrame represents the location of the statement/expression being evaluated.
635//
636// Line and column numbers are defined as follows:
637//
638// - Line and column numbers start with one.  That is, line 1 column 1 denotes
639//   the start of the script.
640// - When inside a stored procedure, all line/column numbers are relative
641//   to the procedure body, not the script in which the procedure was defined.
642// - Start/end positions exclude leading/trailing comments and whitespace.
643//   The end position always ends with a ";", when present.
644// - Multi-byte Unicode characters are treated as just one column.
645// - If the original script (or procedure definition) contains TAB characters,
646//   a tab "snaps" the indentation forward to the nearest multiple of 8
647//   characters, plus 1. For example, a TAB on column 1, 2, 3, 4, 5, 6 , or 8
648//   will advance the next character to column 9.  A TAB on column 9, 10, 11,
649//   12, 13, 14, 15, or 16 will advance the next character to column 17.
650type ScriptStackFrame struct {
651	StartLine   int64
652	StartColumn int64
653	EndLine     int64
654	EndColumn   int64
655	// Name of the active procedure.  Empty if in a top-level script.
656	ProcedureID string
657	// Text of the current statement/expression.
658	Text string
659}
660
661func bqToScriptStackFrame(bsf *bq.ScriptStackFrame) *ScriptStackFrame {
662	if bsf == nil {
663		return nil
664	}
665	return &ScriptStackFrame{
666		StartLine:   bsf.StartLine,
667		StartColumn: bsf.StartColumn,
668		EndLine:     bsf.EndLine,
669		EndColumn:   bsf.EndColumn,
670		ProcedureID: bsf.ProcedureId,
671		Text:        bsf.Text,
672	}
673}
674
675// DMLStatistics contains counts of row mutations triggered by a DML query statement.
676type DMLStatistics struct {
677	// Rows added by the statement.
678	InsertedRowCount int64
679	// Rows removed by the statement.
680	DeletedRowCount int64
681	// Rows changed by the statement.
682	UpdatedRowCount int64
683}
684
685func bqToDMLStatistics(q *bq.DmlStatistics) *DMLStatistics {
686	if q == nil {
687		return nil
688	}
689	return &DMLStatistics{
690		InsertedRowCount: q.InsertedRowCount,
691		DeletedRowCount:  q.DeletedRowCount,
692		UpdatedRowCount:  q.UpdatedRowCount,
693	}
694}
695
696func (*ExtractStatistics) implementsStatistics() {}
697func (*LoadStatistics) implementsStatistics()    {}
698func (*QueryStatistics) implementsStatistics()   {}
699
700// Jobs lists jobs within a project.
701func (c *Client) Jobs(ctx context.Context) *JobIterator {
702	it := &JobIterator{
703		ctx:       ctx,
704		c:         c,
705		ProjectID: c.projectID,
706	}
707	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
708		it.fetch,
709		func() int { return len(it.items) },
710		func() interface{} { b := it.items; it.items = nil; return b })
711	return it
712}
713
714// JobIterator iterates over jobs in a project.
715type JobIterator struct {
716	ProjectID       string    // Project ID of the jobs to list. Default is the client's project.
717	AllUsers        bool      // Whether to list jobs owned by all users in the project, or just the current caller.
718	State           State     // List only jobs in the given state. Defaults to all states.
719	MinCreationTime time.Time // List only jobs created after this time.
720	MaxCreationTime time.Time // List only jobs created before this time.
721	ParentJobID     string    // List only jobs that are children of a given scripting job.
722
723	ctx      context.Context
724	c        *Client
725	pageInfo *iterator.PageInfo
726	nextFunc func() error
727	items    []*Job
728}
729
730// PageInfo is a getter for the JobIterator's PageInfo.
731func (it *JobIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
732
733// Next returns the next Job. Its second return value is iterator.Done if
734// there are no more results. Once Next returns Done, all subsequent calls will
735// return Done.
736func (it *JobIterator) Next() (*Job, error) {
737	if err := it.nextFunc(); err != nil {
738		return nil, err
739	}
740	item := it.items[0]
741	it.items = it.items[1:]
742	return item, nil
743}
744
745func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) {
746	var st string
747	switch it.State {
748	case StateUnspecified:
749		st = ""
750	case Pending:
751		st = "pending"
752	case Running:
753		st = "running"
754	case Done:
755		st = "done"
756	default:
757		return "", fmt.Errorf("bigquery: invalid value for JobIterator.State: %d", it.State)
758	}
759
760	req := it.c.bqs.Jobs.List(it.ProjectID).
761		Context(it.ctx).
762		PageToken(pageToken).
763		Projection("full").
764		AllUsers(it.AllUsers)
765	if st != "" {
766		req.StateFilter(st)
767	}
768	if !it.MinCreationTime.IsZero() {
769		req.MinCreationTime(uint64(it.MinCreationTime.UnixNano() / 1e6))
770	}
771	if !it.MaxCreationTime.IsZero() {
772		req.MaxCreationTime(uint64(it.MaxCreationTime.UnixNano() / 1e6))
773	}
774	setClientHeader(req.Header())
775	if pageSize > 0 {
776		req.MaxResults(int64(pageSize))
777	}
778	if it.ParentJobID != "" {
779		req.ParentJobId(it.ParentJobID)
780	}
781	res, err := req.Do()
782	if err != nil {
783		return "", err
784	}
785	for _, j := range res.Jobs {
786		job, err := convertListedJob(j, it.c)
787		if err != nil {
788			return "", err
789		}
790		it.items = append(it.items, job)
791	}
792	return res.NextPageToken, nil
793}
794
795func convertListedJob(j *bq.JobListJobs, c *Client) (*Job, error) {
796	return bqToJob2(j.JobReference, j.Configuration, j.Status, j.Statistics, j.UserEmail, c)
797}
798
799func (c *Client) getJobInternal(ctx context.Context, jobID, location string, fields ...googleapi.Field) (*bq.Job, error) {
800	var job *bq.Job
801	call := c.bqs.Jobs.Get(c.projectID, jobID).Context(ctx)
802	if location != "" {
803		call = call.Location(location)
804	}
805	if len(fields) > 0 {
806		call = call.Fields(fields...)
807	}
808	setClientHeader(call.Header())
809	err := runWithRetry(ctx, func() (err error) {
810		job, err = call.Do()
811		return err
812	})
813	if err != nil {
814		return nil, err
815	}
816	return job, nil
817}
818
819func bqToJob(q *bq.Job, c *Client) (*Job, error) {
820	return bqToJob2(q.JobReference, q.Configuration, q.Status, q.Statistics, q.UserEmail, c)
821}
822
823func bqToJob2(qr *bq.JobReference, qc *bq.JobConfiguration, qs *bq.JobStatus, qt *bq.JobStatistics, email string, c *Client) (*Job, error) {
824	j := &Job{
825		projectID: qr.ProjectId,
826		jobID:     qr.JobId,
827		location:  qr.Location,
828		c:         c,
829		email:     email,
830	}
831	j.setConfig(qc)
832	if err := j.setStatus(qs); err != nil {
833		return nil, err
834	}
835	j.setStatistics(qt, c)
836	return j, nil
837}
838
839func (j *Job) setConfig(config *bq.JobConfiguration) {
840	if config == nil {
841		return
842	}
843	j.config = config
844}
845
846func (j *Job) isQuery() bool {
847	return j.config != nil && j.config.Query != nil
848}
849
850var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done}
851
852func (j *Job) setStatus(qs *bq.JobStatus) error {
853	if qs == nil {
854		return nil
855	}
856	state, ok := stateMap[qs.State]
857	if !ok {
858		return fmt.Errorf("unexpected job state: %v", qs.State)
859	}
860	j.lastStatus = &JobStatus{
861		State: state,
862		err:   nil,
863	}
864	if err := bqToError(qs.ErrorResult); state == Done && err != nil {
865		j.lastStatus.err = err
866	}
867	for _, ep := range qs.Errors {
868		j.lastStatus.Errors = append(j.lastStatus.Errors, bqToError(ep))
869	}
870	return nil
871}
872
873func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
874	if s == nil || j.lastStatus == nil {
875		return
876	}
877	js := &JobStatistics{
878		CreationTime:        unixMillisToTime(s.CreationTime),
879		StartTime:           unixMillisToTime(s.StartTime),
880		EndTime:             unixMillisToTime(s.EndTime),
881		TotalBytesProcessed: s.TotalBytesProcessed,
882		NumChildJobs:        s.NumChildJobs,
883		ParentJobID:         s.ParentJobId,
884		ScriptStatistics:    bqToScriptStatistics(s.ScriptStatistics),
885		ReservationUsage:    bqToReservationUsage(s.ReservationUsage),
886		TransactionInfo:     bqToTransactionInfo(s.TransactionInfo),
887	}
888	switch {
889	case s.Extract != nil:
890		js.Details = &ExtractStatistics{
891			DestinationURIFileCounts: []int64(s.Extract.DestinationUriFileCounts),
892		}
893	case s.Load != nil:
894		js.Details = &LoadStatistics{
895			InputFileBytes: s.Load.InputFileBytes,
896			InputFiles:     s.Load.InputFiles,
897			OutputBytes:    s.Load.OutputBytes,
898			OutputRows:     s.Load.OutputRows,
899		}
900	case s.Query != nil:
901		var names []string
902		for _, qp := range s.Query.UndeclaredQueryParameters {
903			names = append(names, qp.Name)
904		}
905		var tables []*Table
906		for _, tr := range s.Query.ReferencedTables {
907			tables = append(tables, bqToTable(tr, c))
908		}
909		js.Details = &QueryStatistics{
910			BillingTier:                   s.Query.BillingTier,
911			CacheHit:                      s.Query.CacheHit,
912			DDLTargetTable:                bqToTable(s.Query.DdlTargetTable, c),
913			DDLOperationPerformed:         s.Query.DdlOperationPerformed,
914			DDLTargetRoutine:              bqToRoutine(s.Query.DdlTargetRoutine, c),
915			StatementType:                 s.Query.StatementType,
916			TotalBytesBilled:              s.Query.TotalBytesBilled,
917			TotalBytesProcessed:           s.Query.TotalBytesProcessed,
918			TotalBytesProcessedAccuracy:   s.Query.TotalBytesProcessedAccuracy,
919			NumDMLAffectedRows:            s.Query.NumDmlAffectedRows,
920			DMLStats:                      bqToDMLStatistics(s.Query.DmlStats),
921			QueryPlan:                     queryPlanFromProto(s.Query.QueryPlan),
922			Schema:                        bqToSchema(s.Query.Schema),
923			SlotMillis:                    s.Query.TotalSlotMs,
924			Timeline:                      timelineFromProto(s.Query.Timeline),
925			ReferencedTables:              tables,
926			UndeclaredQueryParameterNames: names,
927		}
928	}
929	j.lastStatus.Statistics = js
930}
931
932func queryPlanFromProto(stages []*bq.ExplainQueryStage) []*ExplainQueryStage {
933	var res []*ExplainQueryStage
934	for _, s := range stages {
935		var steps []*ExplainQueryStep
936		for _, p := range s.Steps {
937			steps = append(steps, &ExplainQueryStep{
938				Kind:     p.Kind,
939				Substeps: p.Substeps,
940			})
941		}
942		res = append(res, &ExplainQueryStage{
943			CompletedParallelInputs:   s.CompletedParallelInputs,
944			ComputeAvg:                time.Duration(s.ComputeMsAvg) * time.Millisecond,
945			ComputeMax:                time.Duration(s.ComputeMsMax) * time.Millisecond,
946			ComputeRatioAvg:           s.ComputeRatioAvg,
947			ComputeRatioMax:           s.ComputeRatioMax,
948			EndTime:                   time.Unix(0, s.EndMs*1e6),
949			ID:                        s.Id,
950			InputStages:               s.InputStages,
951			Name:                      s.Name,
952			ParallelInputs:            s.ParallelInputs,
953			ReadAvg:                   time.Duration(s.ReadMsAvg) * time.Millisecond,
954			ReadMax:                   time.Duration(s.ReadMsMax) * time.Millisecond,
955			ReadRatioAvg:              s.ReadRatioAvg,
956			ReadRatioMax:              s.ReadRatioMax,
957			RecordsRead:               s.RecordsRead,
958			RecordsWritten:            s.RecordsWritten,
959			ShuffleOutputBytes:        s.ShuffleOutputBytes,
960			ShuffleOutputBytesSpilled: s.ShuffleOutputBytesSpilled,
961			StartTime:                 time.Unix(0, s.StartMs*1e6),
962			Status:                    s.Status,
963			Steps:                     steps,
964			WaitAvg:                   time.Duration(s.WaitMsAvg) * time.Millisecond,
965			WaitMax:                   time.Duration(s.WaitMsMax) * time.Millisecond,
966			WaitRatioAvg:              s.WaitRatioAvg,
967			WaitRatioMax:              s.WaitRatioMax,
968			WriteAvg:                  time.Duration(s.WriteMsAvg) * time.Millisecond,
969			WriteMax:                  time.Duration(s.WriteMsMax) * time.Millisecond,
970			WriteRatioAvg:             s.WriteRatioAvg,
971			WriteRatioMax:             s.WriteRatioMax,
972		})
973	}
974	return res
975}
976
977func timelineFromProto(timeline []*bq.QueryTimelineSample) []*QueryTimelineSample {
978	var res []*QueryTimelineSample
979	for _, s := range timeline {
980		res = append(res, &QueryTimelineSample{
981			ActiveUnits:    s.ActiveUnits,
982			CompletedUnits: s.CompletedUnits,
983			Elapsed:        time.Duration(s.ElapsedMs) * time.Millisecond,
984			PendingUnits:   s.PendingUnits,
985			SlotMillis:     s.TotalSlotMs,
986		})
987	}
988	return res
989}
990
991// TransactionInfo contains information about a multi-statement transaction that may have associated with a job.
992type TransactionInfo struct {
993	// TransactionID is the system-generated identifier for the transaction.
994	TransactionID string
995}
996
997func bqToTransactionInfo(in *bq.TransactionInfo) *TransactionInfo {
998	if in == nil {
999		return nil
1000	}
1001	return &TransactionInfo{
1002		TransactionID: in.TransactionId,
1003	}
1004}
1005