1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"errors"
19	"fmt"
20	"time"
21
22	"cloud.google.com/go/internal"
23	"cloud.google.com/go/internal/trace"
24	gax "github.com/googleapis/gax-go"
25	"golang.org/x/net/context"
26	bq "google.golang.org/api/bigquery/v2"
27	"google.golang.org/api/googleapi"
28	"google.golang.org/api/iterator"
29)
30
31// A Job represents an operation which has been submitted to BigQuery for processing.
32type Job struct {
33	c          *Client
34	projectID  string
35	jobID      string
36	location   string
37	email      string
38	config     *bq.JobConfiguration
39	lastStatus *JobStatus
40}
41
42// JobFromID creates a Job which refers to an existing BigQuery job. The job
43// need not have been created by this package. For example, the job may have
44// been created in the BigQuery console.
45//
46// For jobs whose location is other than "US" or "EU", set Client.Location or use
47// JobFromIDLocation.
48func (c *Client) JobFromID(ctx context.Context, id string) (*Job, error) {
49	return c.JobFromIDLocation(ctx, id, c.Location)
50}
51
52// JobFromIDLocation creates a Job which refers to an existing BigQuery job. The job
53// need not have been created by this package (for example, it may have
54// been created in the BigQuery console), but it must exist in the specified location.
55func (c *Client) JobFromIDLocation(ctx context.Context, id, location string) (j *Job, err error) {
56	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.JobFromIDLocation")
57	defer func() { trace.EndSpan(ctx, err) }()
58
59	bqjob, err := c.getJobInternal(ctx, id, location, "configuration", "jobReference", "status", "statistics")
60	if err != nil {
61		return nil, err
62	}
63	return bqToJob(bqjob, c)
64}
65
66// ID returns the job's ID.
67func (j *Job) ID() string {
68	return j.jobID
69}
70
71// Location returns the job's location.
72func (j *Job) Location() string {
73	return j.location
74}
75
76// Email returns the email of the job's creator.
77func (j *Job) Email() string {
78	return j.email
79}
80
81// State is one of a sequence of states that a Job progresses through as it is processed.
82type State int
83
84const (
85	StateUnspecified State = iota // used only as a default in JobIterator
86	Pending
87	Running
88	Done
89)
90
91// JobStatus contains the current State of a job, and errors encountered while processing that job.
92type JobStatus struct {
93	State State
94
95	err error
96
97	// All errors encountered during the running of the job.
98	// Not all Errors are fatal, so errors here do not necessarily mean that the job has completed or was unsuccessful.
99	Errors []*Error
100
101	// Statistics about the job.
102	Statistics *JobStatistics
103}
104
105// JobConfig contains configuration information for a job. It is implemented by
106// *CopyConfig, *ExtractConfig, *LoadConfig and *QueryConfig.
107type JobConfig interface {
108	isJobConfig()
109}
110
111func (*CopyConfig) isJobConfig()    {}
112func (*ExtractConfig) isJobConfig() {}
113func (*LoadConfig) isJobConfig()    {}
114func (*QueryConfig) isJobConfig()   {}
115
116// Config returns the configuration information for j.
117func (j *Job) Config() (JobConfig, error) {
118	return bqToJobConfig(j.config, j.c)
119}
120
121func bqToJobConfig(q *bq.JobConfiguration, c *Client) (JobConfig, error) {
122	switch {
123	case q == nil:
124		return nil, nil
125	case q.Copy != nil:
126		return bqToCopyConfig(q, c), nil
127	case q.Extract != nil:
128		return bqToExtractConfig(q, c), nil
129	case q.Load != nil:
130		return bqToLoadConfig(q, c), nil
131	case q.Query != nil:
132		return bqToQueryConfig(q, c)
133	default:
134		return nil, nil
135	}
136}
137
138// JobIDConfig  describes how to create an ID for a job.
139type JobIDConfig struct {
140	// JobID is the ID to use for the job. If empty, a random job ID will be generated.
141	JobID string
142
143	// If AddJobIDSuffix is true, then a random string will be appended to JobID.
144	AddJobIDSuffix bool
145
146	// Location is the location for the job.
147	Location string
148}
149
150// createJobRef creates a JobReference.
151func (j *JobIDConfig) createJobRef(c *Client) *bq.JobReference {
152	// We don't check whether projectID is empty; the server will return an
153	// error when it encounters the resulting JobReference.
154	loc := j.Location
155	if loc == "" { // Use Client.Location as a default.
156		loc = c.Location
157	}
158	jr := &bq.JobReference{ProjectId: c.projectID, Location: loc}
159	if j.JobID == "" {
160		jr.JobId = randomIDFn()
161	} else if j.AddJobIDSuffix {
162		jr.JobId = j.JobID + "-" + randomIDFn()
163	} else {
164		jr.JobId = j.JobID
165	}
166	return jr
167}
168
169// Done reports whether the job has completed.
170// After Done returns true, the Err method will return an error if the job completed unsuccessfully.
171func (s *JobStatus) Done() bool {
172	return s.State == Done
173}
174
175// Err returns the error that caused the job to complete unsuccessfully (if any).
176func (s *JobStatus) Err() error {
177	return s.err
178}
179
180// Status retrieves the current status of the job from BigQuery. It fails if the Status could not be determined.
181func (j *Job) Status(ctx context.Context) (js *JobStatus, err error) {
182	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Status")
183	defer func() { trace.EndSpan(ctx, err) }()
184
185	bqjob, err := j.c.getJobInternal(ctx, j.jobID, j.location, "status", "statistics")
186	if err != nil {
187		return nil, err
188	}
189	if err := j.setStatus(bqjob.Status); err != nil {
190		return nil, err
191	}
192	j.setStatistics(bqjob.Statistics, j.c)
193	return j.lastStatus, nil
194}
195
196// LastStatus returns the most recently retrieved status of the job. The status is
197// retrieved when a new job is created, or when JobFromID or Job.Status is called.
198// Call Job.Status to get the most up-to-date information about a job.
199func (j *Job) LastStatus() *JobStatus {
200	return j.lastStatus
201}
202
203// Cancel requests that a job be cancelled. This method returns without waiting for
204// cancellation to take effect. To check whether the job has terminated, use Job.Status.
205// Cancelled jobs may still incur costs.
206func (j *Job) Cancel(ctx context.Context) error {
207	// Jobs.Cancel returns a job entity, but the only relevant piece of
208	// data it may contain (the status of the job) is unreliable.  From the
209	// docs: "This call will return immediately, and the client will need
210	// to poll for the job status to see if the cancel completed
211	// successfully".  So it would be misleading to return a status.
212	call := j.c.bqs.Jobs.Cancel(j.projectID, j.jobID).
213		Location(j.location).
214		Fields(). // We don't need any of the response data.
215		Context(ctx)
216	setClientHeader(call.Header())
217	return runWithRetry(ctx, func() error {
218		_, err := call.Do()
219		return err
220	})
221}
222
223// Wait blocks until the job or the context is done. It returns the final status
224// of the job.
225// If an error occurs while retrieving the status, Wait returns that error. But
226// Wait returns nil if the status was retrieved successfully, even if
227// status.Err() != nil. So callers must check both errors. See the example.
228func (j *Job) Wait(ctx context.Context) (js *JobStatus, err error) {
229	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Wait")
230	defer func() { trace.EndSpan(ctx, err) }()
231
232	if j.isQuery() {
233		// We can avoid polling for query jobs.
234		if _, _, err := j.waitForQuery(ctx, j.projectID); err != nil {
235			return nil, err
236		}
237		// Note: extra RPC even if you just want to wait for the query to finish.
238		js, err := j.Status(ctx)
239		if err != nil {
240			return nil, err
241		}
242		return js, nil
243	}
244	// Non-query jobs must poll.
245	err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
246		js, err = j.Status(ctx)
247		if err != nil {
248			return true, err
249		}
250		if js.Done() {
251			return true, nil
252		}
253		return false, nil
254	})
255	if err != nil {
256		return nil, err
257	}
258	return js, nil
259}
260
261// Read fetches the results of a query job.
262// If j is not a query job, Read returns an error.
263func (j *Job) Read(ctx context.Context) (ri *RowIterator, err error) {
264	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Read")
265	defer func() { trace.EndSpan(ctx, err) }()
266
267	return j.read(ctx, j.waitForQuery, fetchPage)
268}
269
270func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, string) (Schema, uint64, error), pf pageFetcher) (*RowIterator, error) {
271	if !j.isQuery() {
272		return nil, errors.New("bigquery: cannot read from a non-query job")
273	}
274	destTable := j.config.Query.DestinationTable
275	// The destination table should only be nil if there was a query error.
276	projectID := j.projectID
277	if destTable != nil && projectID != destTable.ProjectId {
278		return nil, fmt.Errorf("bigquery: job project ID is %q, but destination table's is %q", projectID, destTable.ProjectId)
279	}
280	schema, totalRows, err := waitForQuery(ctx, projectID)
281	if err != nil {
282		return nil, err
283	}
284	if destTable == nil {
285		return nil, errors.New("bigquery: query job missing destination table")
286	}
287	dt := bqToTable(destTable, j.c)
288	if totalRows == 0 {
289		pf = nil
290	}
291	it := newRowIterator(ctx, dt, pf)
292	it.Schema = schema
293	it.TotalRows = totalRows
294	return it, nil
295}
296
297// waitForQuery waits for the query job to complete and returns its schema. It also
298// returns the total number of rows in the result set.
299func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint64, error) {
300	// Use GetQueryResults only to wait for completion, not to read results.
301	call := j.c.bqs.Jobs.GetQueryResults(projectID, j.jobID).Location(j.location).Context(ctx).MaxResults(0)
302	setClientHeader(call.Header())
303	backoff := gax.Backoff{
304		Initial:    1 * time.Second,
305		Multiplier: 2,
306		Max:        60 * time.Second,
307	}
308	var res *bq.GetQueryResultsResponse
309	err := internal.Retry(ctx, backoff, func() (stop bool, err error) {
310		res, err = call.Do()
311		if err != nil {
312			return !retryableError(err), err
313		}
314		if !res.JobComplete { // GetQueryResults may return early without error; retry.
315			return false, nil
316		}
317		return true, nil
318	})
319	if err != nil {
320		return nil, 0, err
321	}
322	return bqToSchema(res.Schema), res.TotalRows, nil
323}
324
325// JobStatistics contains statistics about a job.
326type JobStatistics struct {
327	CreationTime        time.Time
328	StartTime           time.Time
329	EndTime             time.Time
330	TotalBytesProcessed int64
331
332	Details Statistics
333}
334
335// Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics.
336type Statistics interface {
337	implementsStatistics()
338}
339
340// ExtractStatistics contains statistics about an extract job.
341type ExtractStatistics struct {
342	// The number of files per destination URI or URI pattern specified in the
343	// extract configuration. These values will be in the same order as the
344	// URIs specified in the 'destinationUris' field.
345	DestinationURIFileCounts []int64
346}
347
348// LoadStatistics contains statistics about a load job.
349type LoadStatistics struct {
350	// The number of bytes of source data in a load job.
351	InputFileBytes int64
352
353	// The number of source files in a load job.
354	InputFiles int64
355
356	// Size of the loaded data in bytes. Note that while a load job is in the
357	// running state, this value may change.
358	OutputBytes int64
359
360	// The number of rows imported in a load job. Note that while an import job is
361	// in the running state, this value may change.
362	OutputRows int64
363}
364
365// QueryStatistics contains statistics about a query job.
366type QueryStatistics struct {
367	// Billing tier for the job.
368	BillingTier int64
369
370	// Whether the query result was fetched from the query cache.
371	CacheHit bool
372
373	// The type of query statement, if valid.
374	StatementType string
375
376	// Total bytes billed for the job.
377	TotalBytesBilled int64
378
379	// Total bytes processed for the job.
380	TotalBytesProcessed int64
381
382	// Describes execution plan for the query.
383	QueryPlan []*ExplainQueryStage
384
385	// The number of rows affected by a DML statement. Present only for DML
386	// statements INSERT, UPDATE or DELETE.
387	NumDMLAffectedRows int64
388
389	// Describes a timeline of job execution.
390	Timeline []*QueryTimelineSample
391
392	// ReferencedTables: [Output-only, Experimental] Referenced tables for
393	// the job. Queries that reference more than 50 tables will not have a
394	// complete list.
395	ReferencedTables []*Table
396
397	// The schema of the results. Present only for successful dry run of
398	// non-legacy SQL queries.
399	Schema Schema
400
401	// Slot-milliseconds consumed by this query job.
402	SlotMillis int64
403
404	// Standard SQL: list of undeclared query parameter names detected during a
405	// dry run validation.
406	UndeclaredQueryParameterNames []string
407
408	// DDL target table.
409	DDLTargetTable *Table
410
411	// DDL Operation performed on the target table.  Used to report how the
412	// query impacted the DDL target table.
413	DDLOperationPerformed string
414}
415
416// ExplainQueryStage describes one stage of a query.
417type ExplainQueryStage struct {
418	// CompletedParallelInputs: Number of parallel input segments completed.
419	CompletedParallelInputs int64
420
421	// ComputeAvg: Duration the average shard spent on CPU-bound tasks.
422	ComputeAvg time.Duration
423
424	// ComputeMax: Duration the slowest shard spent on CPU-bound tasks.
425	ComputeMax time.Duration
426
427	// Relative amount of the total time the average shard spent on CPU-bound tasks.
428	ComputeRatioAvg float64
429
430	// Relative amount of the total time the slowest shard spent on CPU-bound tasks.
431	ComputeRatioMax float64
432
433	// EndTime: Stage end time.
434	EndTime time.Time
435
436	// Unique ID for stage within plan.
437	ID int64
438
439	// InputStages: IDs for stages that are inputs to this stage.
440	InputStages []int64
441
442	// Human-readable name for stage.
443	Name string
444
445	// ParallelInputs: Number of parallel input segments to be processed.
446	ParallelInputs int64
447
448	// ReadAvg: Duration the average shard spent reading input.
449	ReadAvg time.Duration
450
451	// ReadMax: Duration the slowest shard spent reading input.
452	ReadMax time.Duration
453
454	// Relative amount of the total time the average shard spent reading input.
455	ReadRatioAvg float64
456
457	// Relative amount of the total time the slowest shard spent reading input.
458	ReadRatioMax float64
459
460	// Number of records read into the stage.
461	RecordsRead int64
462
463	// Number of records written by the stage.
464	RecordsWritten int64
465
466	// ShuffleOutputBytes: Total number of bytes written to shuffle.
467	ShuffleOutputBytes int64
468
469	// ShuffleOutputBytesSpilled: Total number of bytes written to shuffle
470	// and spilled to disk.
471	ShuffleOutputBytesSpilled int64
472
473	// StartTime: Stage start time.
474	StartTime time.Time
475
476	// Current status for the stage.
477	Status string
478
479	// List of operations within the stage in dependency order (approximately
480	// chronological).
481	Steps []*ExplainQueryStep
482
483	// WaitAvg: Duration the average shard spent waiting to be scheduled.
484	WaitAvg time.Duration
485
486	// WaitMax: Duration the slowest shard spent waiting to be scheduled.
487	WaitMax time.Duration
488
489	// Relative amount of the total time the average shard spent waiting to be scheduled.
490	WaitRatioAvg float64
491
492	// Relative amount of the total time the slowest shard spent waiting to be scheduled.
493	WaitRatioMax float64
494
495	// WriteAvg: Duration the average shard spent on writing output.
496	WriteAvg time.Duration
497
498	// WriteMax: Duration the slowest shard spent on writing output.
499	WriteMax time.Duration
500
501	// Relative amount of the total time the average shard spent on writing output.
502	WriteRatioAvg float64
503
504	// Relative amount of the total time the slowest shard spent on writing output.
505	WriteRatioMax float64
506}
507
508// ExplainQueryStep describes one step of a query stage.
509type ExplainQueryStep struct {
510	// Machine-readable operation type.
511	Kind string
512
513	// Human-readable stage descriptions.
514	Substeps []string
515}
516
517// QueryTimelineSample represents a sample of execution statistics at a point in time.
518type QueryTimelineSample struct {
519
520	// Total number of units currently being processed by workers, represented as largest value since last sample.
521	ActiveUnits int64
522
523	// Total parallel units of work completed by this query.
524	CompletedUnits int64
525
526	// Time elapsed since start of query execution.
527	Elapsed time.Duration
528
529	// Total parallel units of work remaining for the active stages.
530	PendingUnits int64
531
532	// Cumulative slot-milliseconds consumed by the query.
533	SlotMillis int64
534}
535
536func (*ExtractStatistics) implementsStatistics() {}
537func (*LoadStatistics) implementsStatistics()    {}
538func (*QueryStatistics) implementsStatistics()   {}
539
540// Jobs lists jobs within a project.
541func (c *Client) Jobs(ctx context.Context) *JobIterator {
542	it := &JobIterator{
543		ctx:       ctx,
544		c:         c,
545		ProjectID: c.projectID,
546	}
547	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
548		it.fetch,
549		func() int { return len(it.items) },
550		func() interface{} { b := it.items; it.items = nil; return b })
551	return it
552}
553
554// JobIterator iterates over jobs in a project.
555type JobIterator struct {
556	ProjectID       string    // Project ID of the jobs to list. Default is the client's project.
557	AllUsers        bool      // Whether to list jobs owned by all users in the project, or just the current caller.
558	State           State     // List only jobs in the given state. Defaults to all states.
559	MinCreationTime time.Time // List only jobs created after this time.
560	MaxCreationTime time.Time // List only jobs created before this time.
561
562	ctx      context.Context
563	c        *Client
564	pageInfo *iterator.PageInfo
565	nextFunc func() error
566	items    []*Job
567}
568
569func (it *JobIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
570
571func (it *JobIterator) Next() (*Job, error) {
572	if err := it.nextFunc(); err != nil {
573		return nil, err
574	}
575	item := it.items[0]
576	it.items = it.items[1:]
577	return item, nil
578}
579
580func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) {
581	var st string
582	switch it.State {
583	case StateUnspecified:
584		st = ""
585	case Pending:
586		st = "pending"
587	case Running:
588		st = "running"
589	case Done:
590		st = "done"
591	default:
592		return "", fmt.Errorf("bigquery: invalid value for JobIterator.State: %d", it.State)
593	}
594
595	req := it.c.bqs.Jobs.List(it.ProjectID).
596		Context(it.ctx).
597		PageToken(pageToken).
598		Projection("full").
599		AllUsers(it.AllUsers)
600	if st != "" {
601		req.StateFilter(st)
602	}
603	if !it.MinCreationTime.IsZero() {
604		req.MinCreationTime(uint64(it.MinCreationTime.UnixNano() / 1e6))
605	}
606	if !it.MaxCreationTime.IsZero() {
607		req.MaxCreationTime(uint64(it.MaxCreationTime.UnixNano() / 1e6))
608	}
609	setClientHeader(req.Header())
610	if pageSize > 0 {
611		req.MaxResults(int64(pageSize))
612	}
613	res, err := req.Do()
614	if err != nil {
615		return "", err
616	}
617	for _, j := range res.Jobs {
618		job, err := convertListedJob(j, it.c)
619		if err != nil {
620			return "", err
621		}
622		it.items = append(it.items, job)
623	}
624	return res.NextPageToken, nil
625}
626
627func convertListedJob(j *bq.JobListJobs, c *Client) (*Job, error) {
628	return bqToJob2(j.JobReference, j.Configuration, j.Status, j.Statistics, j.UserEmail, c)
629}
630
631func (c *Client) getJobInternal(ctx context.Context, jobID, location string, fields ...googleapi.Field) (*bq.Job, error) {
632	var job *bq.Job
633	call := c.bqs.Jobs.Get(c.projectID, jobID).Context(ctx)
634	if location != "" {
635		call = call.Location(location)
636	}
637	if len(fields) > 0 {
638		call = call.Fields(fields...)
639	}
640	setClientHeader(call.Header())
641	err := runWithRetry(ctx, func() (err error) {
642		job, err = call.Do()
643		return err
644	})
645	if err != nil {
646		return nil, err
647	}
648	return job, nil
649}
650
651func bqToJob(q *bq.Job, c *Client) (*Job, error) {
652	return bqToJob2(q.JobReference, q.Configuration, q.Status, q.Statistics, q.UserEmail, c)
653}
654
655func bqToJob2(qr *bq.JobReference, qc *bq.JobConfiguration, qs *bq.JobStatus, qt *bq.JobStatistics, email string, c *Client) (*Job, error) {
656	j := &Job{
657		projectID: qr.ProjectId,
658		jobID:     qr.JobId,
659		location:  qr.Location,
660		c:         c,
661		email:     email,
662	}
663	j.setConfig(qc)
664	if err := j.setStatus(qs); err != nil {
665		return nil, err
666	}
667	j.setStatistics(qt, c)
668	return j, nil
669}
670
671func (j *Job) setConfig(config *bq.JobConfiguration) {
672	if config == nil {
673		return
674	}
675	j.config = config
676}
677
678func (j *Job) isQuery() bool {
679	return j.config != nil && j.config.Query != nil
680}
681
682var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done}
683
684func (j *Job) setStatus(qs *bq.JobStatus) error {
685	if qs == nil {
686		return nil
687	}
688	state, ok := stateMap[qs.State]
689	if !ok {
690		return fmt.Errorf("unexpected job state: %v", qs.State)
691	}
692	j.lastStatus = &JobStatus{
693		State: state,
694		err:   nil,
695	}
696	if err := bqToError(qs.ErrorResult); state == Done && err != nil {
697		j.lastStatus.err = err
698	}
699	for _, ep := range qs.Errors {
700		j.lastStatus.Errors = append(j.lastStatus.Errors, bqToError(ep))
701	}
702	return nil
703}
704
705func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
706	if s == nil || j.lastStatus == nil {
707		return
708	}
709	js := &JobStatistics{
710		CreationTime:        unixMillisToTime(s.CreationTime),
711		StartTime:           unixMillisToTime(s.StartTime),
712		EndTime:             unixMillisToTime(s.EndTime),
713		TotalBytesProcessed: s.TotalBytesProcessed,
714	}
715	switch {
716	case s.Extract != nil:
717		js.Details = &ExtractStatistics{
718			DestinationURIFileCounts: []int64(s.Extract.DestinationUriFileCounts),
719		}
720	case s.Load != nil:
721		js.Details = &LoadStatistics{
722			InputFileBytes: s.Load.InputFileBytes,
723			InputFiles:     s.Load.InputFiles,
724			OutputBytes:    s.Load.OutputBytes,
725			OutputRows:     s.Load.OutputRows,
726		}
727	case s.Query != nil:
728		var names []string
729		for _, qp := range s.Query.UndeclaredQueryParameters {
730			names = append(names, qp.Name)
731		}
732		var tables []*Table
733		for _, tr := range s.Query.ReferencedTables {
734			tables = append(tables, bqToTable(tr, c))
735		}
736		js.Details = &QueryStatistics{
737			BillingTier:                   s.Query.BillingTier,
738			CacheHit:                      s.Query.CacheHit,
739			DDLTargetTable:                bqToTable(s.Query.DdlTargetTable, c),
740			DDLOperationPerformed:         s.Query.DdlOperationPerformed,
741			StatementType:                 s.Query.StatementType,
742			TotalBytesBilled:              s.Query.TotalBytesBilled,
743			TotalBytesProcessed:           s.Query.TotalBytesProcessed,
744			NumDMLAffectedRows:            s.Query.NumDmlAffectedRows,
745			QueryPlan:                     queryPlanFromProto(s.Query.QueryPlan),
746			Schema:                        bqToSchema(s.Query.Schema),
747			SlotMillis:                    s.Query.TotalSlotMs,
748			Timeline:                      timelineFromProto(s.Query.Timeline),
749			ReferencedTables:              tables,
750			UndeclaredQueryParameterNames: names,
751		}
752	}
753	j.lastStatus.Statistics = js
754}
755
756func queryPlanFromProto(stages []*bq.ExplainQueryStage) []*ExplainQueryStage {
757	var res []*ExplainQueryStage
758	for _, s := range stages {
759		var steps []*ExplainQueryStep
760		for _, p := range s.Steps {
761			steps = append(steps, &ExplainQueryStep{
762				Kind:     p.Kind,
763				Substeps: p.Substeps,
764			})
765		}
766		res = append(res, &ExplainQueryStage{
767			CompletedParallelInputs:   s.CompletedParallelInputs,
768			ComputeAvg:                time.Duration(s.ComputeMsAvg) * time.Millisecond,
769			ComputeMax:                time.Duration(s.ComputeMsMax) * time.Millisecond,
770			ComputeRatioAvg:           s.ComputeRatioAvg,
771			ComputeRatioMax:           s.ComputeRatioMax,
772			EndTime:                   time.Unix(0, s.EndMs*1e6),
773			ID:                        s.Id,
774			InputStages:               s.InputStages,
775			Name:                      s.Name,
776			ParallelInputs:            s.ParallelInputs,
777			ReadAvg:                   time.Duration(s.ReadMsAvg) * time.Millisecond,
778			ReadMax:                   time.Duration(s.ReadMsMax) * time.Millisecond,
779			ReadRatioAvg:              s.ReadRatioAvg,
780			ReadRatioMax:              s.ReadRatioMax,
781			RecordsRead:               s.RecordsRead,
782			RecordsWritten:            s.RecordsWritten,
783			ShuffleOutputBytes:        s.ShuffleOutputBytes,
784			ShuffleOutputBytesSpilled: s.ShuffleOutputBytesSpilled,
785			StartTime:                 time.Unix(0, s.StartMs*1e6),
786			Status:                    s.Status,
787			Steps:                     steps,
788			WaitAvg:                   time.Duration(s.WaitMsAvg) * time.Millisecond,
789			WaitMax:                   time.Duration(s.WaitMsMax) * time.Millisecond,
790			WaitRatioAvg:              s.WaitRatioAvg,
791			WaitRatioMax:              s.WaitRatioMax,
792			WriteAvg:                  time.Duration(s.WriteMsAvg) * time.Millisecond,
793			WriteMax:                  time.Duration(s.WriteMsMax) * time.Millisecond,
794			WriteRatioAvg:             s.WriteRatioAvg,
795			WriteRatioMax:             s.WriteRatioMax,
796		})
797	}
798	return res
799}
800
801func timelineFromProto(timeline []*bq.QueryTimelineSample) []*QueryTimelineSample {
802	var res []*QueryTimelineSample
803	for _, s := range timeline {
804		res = append(res, &QueryTimelineSample{
805			ActiveUnits:    s.ActiveUnits,
806			CompletedUnits: s.CompletedUnits,
807			Elapsed:        time.Duration(s.ElapsedMs) * time.Millisecond,
808			PendingUnits:   s.PendingUnits,
809			SlotMillis:     s.TotalSlotMs,
810		})
811	}
812	return res
813}
814