1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "errors" 19 "fmt" 20 "time" 21 22 "cloud.google.com/go/internal" 23 "cloud.google.com/go/internal/trace" 24 gax "github.com/googleapis/gax-go" 25 "golang.org/x/net/context" 26 bq "google.golang.org/api/bigquery/v2" 27 "google.golang.org/api/googleapi" 28 "google.golang.org/api/iterator" 29) 30 31// A Job represents an operation which has been submitted to BigQuery for processing. 32type Job struct { 33 c *Client 34 projectID string 35 jobID string 36 location string 37 email string 38 config *bq.JobConfiguration 39 lastStatus *JobStatus 40} 41 42// JobFromID creates a Job which refers to an existing BigQuery job. The job 43// need not have been created by this package. For example, the job may have 44// been created in the BigQuery console. 45// 46// For jobs whose location is other than "US" or "EU", set Client.Location or use 47// JobFromIDLocation. 48func (c *Client) JobFromID(ctx context.Context, id string) (*Job, error) { 49 return c.JobFromIDLocation(ctx, id, c.Location) 50} 51 52// JobFromIDLocation creates a Job which refers to an existing BigQuery job. The job 53// need not have been created by this package (for example, it may have 54// been created in the BigQuery console), but it must exist in the specified location. 55func (c *Client) JobFromIDLocation(ctx context.Context, id, location string) (j *Job, err error) { 56 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.JobFromIDLocation") 57 defer func() { trace.EndSpan(ctx, err) }() 58 59 bqjob, err := c.getJobInternal(ctx, id, location, "configuration", "jobReference", "status", "statistics") 60 if err != nil { 61 return nil, err 62 } 63 return bqToJob(bqjob, c) 64} 65 66// ID returns the job's ID. 67func (j *Job) ID() string { 68 return j.jobID 69} 70 71// Location returns the job's location. 72func (j *Job) Location() string { 73 return j.location 74} 75 76// Email returns the email of the job's creator. 77func (j *Job) Email() string { 78 return j.email 79} 80 81// State is one of a sequence of states that a Job progresses through as it is processed. 82type State int 83 84const ( 85 StateUnspecified State = iota // used only as a default in JobIterator 86 Pending 87 Running 88 Done 89) 90 91// JobStatus contains the current State of a job, and errors encountered while processing that job. 92type JobStatus struct { 93 State State 94 95 err error 96 97 // All errors encountered during the running of the job. 98 // Not all Errors are fatal, so errors here do not necessarily mean that the job has completed or was unsuccessful. 99 Errors []*Error 100 101 // Statistics about the job. 102 Statistics *JobStatistics 103} 104 105// JobConfig contains configuration information for a job. It is implemented by 106// *CopyConfig, *ExtractConfig, *LoadConfig and *QueryConfig. 107type JobConfig interface { 108 isJobConfig() 109} 110 111func (*CopyConfig) isJobConfig() {} 112func (*ExtractConfig) isJobConfig() {} 113func (*LoadConfig) isJobConfig() {} 114func (*QueryConfig) isJobConfig() {} 115 116// Config returns the configuration information for j. 117func (j *Job) Config() (JobConfig, error) { 118 return bqToJobConfig(j.config, j.c) 119} 120 121func bqToJobConfig(q *bq.JobConfiguration, c *Client) (JobConfig, error) { 122 switch { 123 case q == nil: 124 return nil, nil 125 case q.Copy != nil: 126 return bqToCopyConfig(q, c), nil 127 case q.Extract != nil: 128 return bqToExtractConfig(q, c), nil 129 case q.Load != nil: 130 return bqToLoadConfig(q, c), nil 131 case q.Query != nil: 132 return bqToQueryConfig(q, c) 133 default: 134 return nil, nil 135 } 136} 137 138// JobIDConfig describes how to create an ID for a job. 139type JobIDConfig struct { 140 // JobID is the ID to use for the job. If empty, a random job ID will be generated. 141 JobID string 142 143 // If AddJobIDSuffix is true, then a random string will be appended to JobID. 144 AddJobIDSuffix bool 145 146 // Location is the location for the job. 147 Location string 148} 149 150// createJobRef creates a JobReference. 151func (j *JobIDConfig) createJobRef(c *Client) *bq.JobReference { 152 // We don't check whether projectID is empty; the server will return an 153 // error when it encounters the resulting JobReference. 154 loc := j.Location 155 if loc == "" { // Use Client.Location as a default. 156 loc = c.Location 157 } 158 jr := &bq.JobReference{ProjectId: c.projectID, Location: loc} 159 if j.JobID == "" { 160 jr.JobId = randomIDFn() 161 } else if j.AddJobIDSuffix { 162 jr.JobId = j.JobID + "-" + randomIDFn() 163 } else { 164 jr.JobId = j.JobID 165 } 166 return jr 167} 168 169// Done reports whether the job has completed. 170// After Done returns true, the Err method will return an error if the job completed unsuccessfully. 171func (s *JobStatus) Done() bool { 172 return s.State == Done 173} 174 175// Err returns the error that caused the job to complete unsuccessfully (if any). 176func (s *JobStatus) Err() error { 177 return s.err 178} 179 180// Status retrieves the current status of the job from BigQuery. It fails if the Status could not be determined. 181func (j *Job) Status(ctx context.Context) (js *JobStatus, err error) { 182 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Status") 183 defer func() { trace.EndSpan(ctx, err) }() 184 185 bqjob, err := j.c.getJobInternal(ctx, j.jobID, j.location, "status", "statistics") 186 if err != nil { 187 return nil, err 188 } 189 if err := j.setStatus(bqjob.Status); err != nil { 190 return nil, err 191 } 192 j.setStatistics(bqjob.Statistics, j.c) 193 return j.lastStatus, nil 194} 195 196// LastStatus returns the most recently retrieved status of the job. The status is 197// retrieved when a new job is created, or when JobFromID or Job.Status is called. 198// Call Job.Status to get the most up-to-date information about a job. 199func (j *Job) LastStatus() *JobStatus { 200 return j.lastStatus 201} 202 203// Cancel requests that a job be cancelled. This method returns without waiting for 204// cancellation to take effect. To check whether the job has terminated, use Job.Status. 205// Cancelled jobs may still incur costs. 206func (j *Job) Cancel(ctx context.Context) error { 207 // Jobs.Cancel returns a job entity, but the only relevant piece of 208 // data it may contain (the status of the job) is unreliable. From the 209 // docs: "This call will return immediately, and the client will need 210 // to poll for the job status to see if the cancel completed 211 // successfully". So it would be misleading to return a status. 212 call := j.c.bqs.Jobs.Cancel(j.projectID, j.jobID). 213 Location(j.location). 214 Fields(). // We don't need any of the response data. 215 Context(ctx) 216 setClientHeader(call.Header()) 217 return runWithRetry(ctx, func() error { 218 _, err := call.Do() 219 return err 220 }) 221} 222 223// Wait blocks until the job or the context is done. It returns the final status 224// of the job. 225// If an error occurs while retrieving the status, Wait returns that error. But 226// Wait returns nil if the status was retrieved successfully, even if 227// status.Err() != nil. So callers must check both errors. See the example. 228func (j *Job) Wait(ctx context.Context) (js *JobStatus, err error) { 229 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Wait") 230 defer func() { trace.EndSpan(ctx, err) }() 231 232 if j.isQuery() { 233 // We can avoid polling for query jobs. 234 if _, _, err := j.waitForQuery(ctx, j.projectID); err != nil { 235 return nil, err 236 } 237 // Note: extra RPC even if you just want to wait for the query to finish. 238 js, err := j.Status(ctx) 239 if err != nil { 240 return nil, err 241 } 242 return js, nil 243 } 244 // Non-query jobs must poll. 245 err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) { 246 js, err = j.Status(ctx) 247 if err != nil { 248 return true, err 249 } 250 if js.Done() { 251 return true, nil 252 } 253 return false, nil 254 }) 255 if err != nil { 256 return nil, err 257 } 258 return js, nil 259} 260 261// Read fetches the results of a query job. 262// If j is not a query job, Read returns an error. 263func (j *Job) Read(ctx context.Context) (ri *RowIterator, err error) { 264 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Read") 265 defer func() { trace.EndSpan(ctx, err) }() 266 267 return j.read(ctx, j.waitForQuery, fetchPage) 268} 269 270func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, string) (Schema, uint64, error), pf pageFetcher) (*RowIterator, error) { 271 if !j.isQuery() { 272 return nil, errors.New("bigquery: cannot read from a non-query job") 273 } 274 destTable := j.config.Query.DestinationTable 275 // The destination table should only be nil if there was a query error. 276 projectID := j.projectID 277 if destTable != nil && projectID != destTable.ProjectId { 278 return nil, fmt.Errorf("bigquery: job project ID is %q, but destination table's is %q", projectID, destTable.ProjectId) 279 } 280 schema, totalRows, err := waitForQuery(ctx, projectID) 281 if err != nil { 282 return nil, err 283 } 284 if destTable == nil { 285 return nil, errors.New("bigquery: query job missing destination table") 286 } 287 dt := bqToTable(destTable, j.c) 288 if totalRows == 0 { 289 pf = nil 290 } 291 it := newRowIterator(ctx, dt, pf) 292 it.Schema = schema 293 it.TotalRows = totalRows 294 return it, nil 295} 296 297// waitForQuery waits for the query job to complete and returns its schema. It also 298// returns the total number of rows in the result set. 299func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint64, error) { 300 // Use GetQueryResults only to wait for completion, not to read results. 301 call := j.c.bqs.Jobs.GetQueryResults(projectID, j.jobID).Location(j.location).Context(ctx).MaxResults(0) 302 setClientHeader(call.Header()) 303 backoff := gax.Backoff{ 304 Initial: 1 * time.Second, 305 Multiplier: 2, 306 Max: 60 * time.Second, 307 } 308 var res *bq.GetQueryResultsResponse 309 err := internal.Retry(ctx, backoff, func() (stop bool, err error) { 310 res, err = call.Do() 311 if err != nil { 312 return !retryableError(err), err 313 } 314 if !res.JobComplete { // GetQueryResults may return early without error; retry. 315 return false, nil 316 } 317 return true, nil 318 }) 319 if err != nil { 320 return nil, 0, err 321 } 322 return bqToSchema(res.Schema), res.TotalRows, nil 323} 324 325// JobStatistics contains statistics about a job. 326type JobStatistics struct { 327 CreationTime time.Time 328 StartTime time.Time 329 EndTime time.Time 330 TotalBytesProcessed int64 331 332 Details Statistics 333} 334 335// Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics. 336type Statistics interface { 337 implementsStatistics() 338} 339 340// ExtractStatistics contains statistics about an extract job. 341type ExtractStatistics struct { 342 // The number of files per destination URI or URI pattern specified in the 343 // extract configuration. These values will be in the same order as the 344 // URIs specified in the 'destinationUris' field. 345 DestinationURIFileCounts []int64 346} 347 348// LoadStatistics contains statistics about a load job. 349type LoadStatistics struct { 350 // The number of bytes of source data in a load job. 351 InputFileBytes int64 352 353 // The number of source files in a load job. 354 InputFiles int64 355 356 // Size of the loaded data in bytes. Note that while a load job is in the 357 // running state, this value may change. 358 OutputBytes int64 359 360 // The number of rows imported in a load job. Note that while an import job is 361 // in the running state, this value may change. 362 OutputRows int64 363} 364 365// QueryStatistics contains statistics about a query job. 366type QueryStatistics struct { 367 // Billing tier for the job. 368 BillingTier int64 369 370 // Whether the query result was fetched from the query cache. 371 CacheHit bool 372 373 // The type of query statement, if valid. 374 StatementType string 375 376 // Total bytes billed for the job. 377 TotalBytesBilled int64 378 379 // Total bytes processed for the job. 380 TotalBytesProcessed int64 381 382 // Describes execution plan for the query. 383 QueryPlan []*ExplainQueryStage 384 385 // The number of rows affected by a DML statement. Present only for DML 386 // statements INSERT, UPDATE or DELETE. 387 NumDMLAffectedRows int64 388 389 // Describes a timeline of job execution. 390 Timeline []*QueryTimelineSample 391 392 // ReferencedTables: [Output-only, Experimental] Referenced tables for 393 // the job. Queries that reference more than 50 tables will not have a 394 // complete list. 395 ReferencedTables []*Table 396 397 // The schema of the results. Present only for successful dry run of 398 // non-legacy SQL queries. 399 Schema Schema 400 401 // Slot-milliseconds consumed by this query job. 402 SlotMillis int64 403 404 // Standard SQL: list of undeclared query parameter names detected during a 405 // dry run validation. 406 UndeclaredQueryParameterNames []string 407 408 // DDL target table. 409 DDLTargetTable *Table 410 411 // DDL Operation performed on the target table. Used to report how the 412 // query impacted the DDL target table. 413 DDLOperationPerformed string 414} 415 416// ExplainQueryStage describes one stage of a query. 417type ExplainQueryStage struct { 418 // CompletedParallelInputs: Number of parallel input segments completed. 419 CompletedParallelInputs int64 420 421 // ComputeAvg: Duration the average shard spent on CPU-bound tasks. 422 ComputeAvg time.Duration 423 424 // ComputeMax: Duration the slowest shard spent on CPU-bound tasks. 425 ComputeMax time.Duration 426 427 // Relative amount of the total time the average shard spent on CPU-bound tasks. 428 ComputeRatioAvg float64 429 430 // Relative amount of the total time the slowest shard spent on CPU-bound tasks. 431 ComputeRatioMax float64 432 433 // EndTime: Stage end time. 434 EndTime time.Time 435 436 // Unique ID for stage within plan. 437 ID int64 438 439 // InputStages: IDs for stages that are inputs to this stage. 440 InputStages []int64 441 442 // Human-readable name for stage. 443 Name string 444 445 // ParallelInputs: Number of parallel input segments to be processed. 446 ParallelInputs int64 447 448 // ReadAvg: Duration the average shard spent reading input. 449 ReadAvg time.Duration 450 451 // ReadMax: Duration the slowest shard spent reading input. 452 ReadMax time.Duration 453 454 // Relative amount of the total time the average shard spent reading input. 455 ReadRatioAvg float64 456 457 // Relative amount of the total time the slowest shard spent reading input. 458 ReadRatioMax float64 459 460 // Number of records read into the stage. 461 RecordsRead int64 462 463 // Number of records written by the stage. 464 RecordsWritten int64 465 466 // ShuffleOutputBytes: Total number of bytes written to shuffle. 467 ShuffleOutputBytes int64 468 469 // ShuffleOutputBytesSpilled: Total number of bytes written to shuffle 470 // and spilled to disk. 471 ShuffleOutputBytesSpilled int64 472 473 // StartTime: Stage start time. 474 StartTime time.Time 475 476 // Current status for the stage. 477 Status string 478 479 // List of operations within the stage in dependency order (approximately 480 // chronological). 481 Steps []*ExplainQueryStep 482 483 // WaitAvg: Duration the average shard spent waiting to be scheduled. 484 WaitAvg time.Duration 485 486 // WaitMax: Duration the slowest shard spent waiting to be scheduled. 487 WaitMax time.Duration 488 489 // Relative amount of the total time the average shard spent waiting to be scheduled. 490 WaitRatioAvg float64 491 492 // Relative amount of the total time the slowest shard spent waiting to be scheduled. 493 WaitRatioMax float64 494 495 // WriteAvg: Duration the average shard spent on writing output. 496 WriteAvg time.Duration 497 498 // WriteMax: Duration the slowest shard spent on writing output. 499 WriteMax time.Duration 500 501 // Relative amount of the total time the average shard spent on writing output. 502 WriteRatioAvg float64 503 504 // Relative amount of the total time the slowest shard spent on writing output. 505 WriteRatioMax float64 506} 507 508// ExplainQueryStep describes one step of a query stage. 509type ExplainQueryStep struct { 510 // Machine-readable operation type. 511 Kind string 512 513 // Human-readable stage descriptions. 514 Substeps []string 515} 516 517// QueryTimelineSample represents a sample of execution statistics at a point in time. 518type QueryTimelineSample struct { 519 520 // Total number of units currently being processed by workers, represented as largest value since last sample. 521 ActiveUnits int64 522 523 // Total parallel units of work completed by this query. 524 CompletedUnits int64 525 526 // Time elapsed since start of query execution. 527 Elapsed time.Duration 528 529 // Total parallel units of work remaining for the active stages. 530 PendingUnits int64 531 532 // Cumulative slot-milliseconds consumed by the query. 533 SlotMillis int64 534} 535 536func (*ExtractStatistics) implementsStatistics() {} 537func (*LoadStatistics) implementsStatistics() {} 538func (*QueryStatistics) implementsStatistics() {} 539 540// Jobs lists jobs within a project. 541func (c *Client) Jobs(ctx context.Context) *JobIterator { 542 it := &JobIterator{ 543 ctx: ctx, 544 c: c, 545 ProjectID: c.projectID, 546 } 547 it.pageInfo, it.nextFunc = iterator.NewPageInfo( 548 it.fetch, 549 func() int { return len(it.items) }, 550 func() interface{} { b := it.items; it.items = nil; return b }) 551 return it 552} 553 554// JobIterator iterates over jobs in a project. 555type JobIterator struct { 556 ProjectID string // Project ID of the jobs to list. Default is the client's project. 557 AllUsers bool // Whether to list jobs owned by all users in the project, or just the current caller. 558 State State // List only jobs in the given state. Defaults to all states. 559 MinCreationTime time.Time // List only jobs created after this time. 560 MaxCreationTime time.Time // List only jobs created before this time. 561 562 ctx context.Context 563 c *Client 564 pageInfo *iterator.PageInfo 565 nextFunc func() error 566 items []*Job 567} 568 569func (it *JobIterator) PageInfo() *iterator.PageInfo { return it.pageInfo } 570 571func (it *JobIterator) Next() (*Job, error) { 572 if err := it.nextFunc(); err != nil { 573 return nil, err 574 } 575 item := it.items[0] 576 it.items = it.items[1:] 577 return item, nil 578} 579 580func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) { 581 var st string 582 switch it.State { 583 case StateUnspecified: 584 st = "" 585 case Pending: 586 st = "pending" 587 case Running: 588 st = "running" 589 case Done: 590 st = "done" 591 default: 592 return "", fmt.Errorf("bigquery: invalid value for JobIterator.State: %d", it.State) 593 } 594 595 req := it.c.bqs.Jobs.List(it.ProjectID). 596 Context(it.ctx). 597 PageToken(pageToken). 598 Projection("full"). 599 AllUsers(it.AllUsers) 600 if st != "" { 601 req.StateFilter(st) 602 } 603 if !it.MinCreationTime.IsZero() { 604 req.MinCreationTime(uint64(it.MinCreationTime.UnixNano() / 1e6)) 605 } 606 if !it.MaxCreationTime.IsZero() { 607 req.MaxCreationTime(uint64(it.MaxCreationTime.UnixNano() / 1e6)) 608 } 609 setClientHeader(req.Header()) 610 if pageSize > 0 { 611 req.MaxResults(int64(pageSize)) 612 } 613 res, err := req.Do() 614 if err != nil { 615 return "", err 616 } 617 for _, j := range res.Jobs { 618 job, err := convertListedJob(j, it.c) 619 if err != nil { 620 return "", err 621 } 622 it.items = append(it.items, job) 623 } 624 return res.NextPageToken, nil 625} 626 627func convertListedJob(j *bq.JobListJobs, c *Client) (*Job, error) { 628 return bqToJob2(j.JobReference, j.Configuration, j.Status, j.Statistics, j.UserEmail, c) 629} 630 631func (c *Client) getJobInternal(ctx context.Context, jobID, location string, fields ...googleapi.Field) (*bq.Job, error) { 632 var job *bq.Job 633 call := c.bqs.Jobs.Get(c.projectID, jobID).Context(ctx) 634 if location != "" { 635 call = call.Location(location) 636 } 637 if len(fields) > 0 { 638 call = call.Fields(fields...) 639 } 640 setClientHeader(call.Header()) 641 err := runWithRetry(ctx, func() (err error) { 642 job, err = call.Do() 643 return err 644 }) 645 if err != nil { 646 return nil, err 647 } 648 return job, nil 649} 650 651func bqToJob(q *bq.Job, c *Client) (*Job, error) { 652 return bqToJob2(q.JobReference, q.Configuration, q.Status, q.Statistics, q.UserEmail, c) 653} 654 655func bqToJob2(qr *bq.JobReference, qc *bq.JobConfiguration, qs *bq.JobStatus, qt *bq.JobStatistics, email string, c *Client) (*Job, error) { 656 j := &Job{ 657 projectID: qr.ProjectId, 658 jobID: qr.JobId, 659 location: qr.Location, 660 c: c, 661 email: email, 662 } 663 j.setConfig(qc) 664 if err := j.setStatus(qs); err != nil { 665 return nil, err 666 } 667 j.setStatistics(qt, c) 668 return j, nil 669} 670 671func (j *Job) setConfig(config *bq.JobConfiguration) { 672 if config == nil { 673 return 674 } 675 j.config = config 676} 677 678func (j *Job) isQuery() bool { 679 return j.config != nil && j.config.Query != nil 680} 681 682var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done} 683 684func (j *Job) setStatus(qs *bq.JobStatus) error { 685 if qs == nil { 686 return nil 687 } 688 state, ok := stateMap[qs.State] 689 if !ok { 690 return fmt.Errorf("unexpected job state: %v", qs.State) 691 } 692 j.lastStatus = &JobStatus{ 693 State: state, 694 err: nil, 695 } 696 if err := bqToError(qs.ErrorResult); state == Done && err != nil { 697 j.lastStatus.err = err 698 } 699 for _, ep := range qs.Errors { 700 j.lastStatus.Errors = append(j.lastStatus.Errors, bqToError(ep)) 701 } 702 return nil 703} 704 705func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) { 706 if s == nil || j.lastStatus == nil { 707 return 708 } 709 js := &JobStatistics{ 710 CreationTime: unixMillisToTime(s.CreationTime), 711 StartTime: unixMillisToTime(s.StartTime), 712 EndTime: unixMillisToTime(s.EndTime), 713 TotalBytesProcessed: s.TotalBytesProcessed, 714 } 715 switch { 716 case s.Extract != nil: 717 js.Details = &ExtractStatistics{ 718 DestinationURIFileCounts: []int64(s.Extract.DestinationUriFileCounts), 719 } 720 case s.Load != nil: 721 js.Details = &LoadStatistics{ 722 InputFileBytes: s.Load.InputFileBytes, 723 InputFiles: s.Load.InputFiles, 724 OutputBytes: s.Load.OutputBytes, 725 OutputRows: s.Load.OutputRows, 726 } 727 case s.Query != nil: 728 var names []string 729 for _, qp := range s.Query.UndeclaredQueryParameters { 730 names = append(names, qp.Name) 731 } 732 var tables []*Table 733 for _, tr := range s.Query.ReferencedTables { 734 tables = append(tables, bqToTable(tr, c)) 735 } 736 js.Details = &QueryStatistics{ 737 BillingTier: s.Query.BillingTier, 738 CacheHit: s.Query.CacheHit, 739 DDLTargetTable: bqToTable(s.Query.DdlTargetTable, c), 740 DDLOperationPerformed: s.Query.DdlOperationPerformed, 741 StatementType: s.Query.StatementType, 742 TotalBytesBilled: s.Query.TotalBytesBilled, 743 TotalBytesProcessed: s.Query.TotalBytesProcessed, 744 NumDMLAffectedRows: s.Query.NumDmlAffectedRows, 745 QueryPlan: queryPlanFromProto(s.Query.QueryPlan), 746 Schema: bqToSchema(s.Query.Schema), 747 SlotMillis: s.Query.TotalSlotMs, 748 Timeline: timelineFromProto(s.Query.Timeline), 749 ReferencedTables: tables, 750 UndeclaredQueryParameterNames: names, 751 } 752 } 753 j.lastStatus.Statistics = js 754} 755 756func queryPlanFromProto(stages []*bq.ExplainQueryStage) []*ExplainQueryStage { 757 var res []*ExplainQueryStage 758 for _, s := range stages { 759 var steps []*ExplainQueryStep 760 for _, p := range s.Steps { 761 steps = append(steps, &ExplainQueryStep{ 762 Kind: p.Kind, 763 Substeps: p.Substeps, 764 }) 765 } 766 res = append(res, &ExplainQueryStage{ 767 CompletedParallelInputs: s.CompletedParallelInputs, 768 ComputeAvg: time.Duration(s.ComputeMsAvg) * time.Millisecond, 769 ComputeMax: time.Duration(s.ComputeMsMax) * time.Millisecond, 770 ComputeRatioAvg: s.ComputeRatioAvg, 771 ComputeRatioMax: s.ComputeRatioMax, 772 EndTime: time.Unix(0, s.EndMs*1e6), 773 ID: s.Id, 774 InputStages: s.InputStages, 775 Name: s.Name, 776 ParallelInputs: s.ParallelInputs, 777 ReadAvg: time.Duration(s.ReadMsAvg) * time.Millisecond, 778 ReadMax: time.Duration(s.ReadMsMax) * time.Millisecond, 779 ReadRatioAvg: s.ReadRatioAvg, 780 ReadRatioMax: s.ReadRatioMax, 781 RecordsRead: s.RecordsRead, 782 RecordsWritten: s.RecordsWritten, 783 ShuffleOutputBytes: s.ShuffleOutputBytes, 784 ShuffleOutputBytesSpilled: s.ShuffleOutputBytesSpilled, 785 StartTime: time.Unix(0, s.StartMs*1e6), 786 Status: s.Status, 787 Steps: steps, 788 WaitAvg: time.Duration(s.WaitMsAvg) * time.Millisecond, 789 WaitMax: time.Duration(s.WaitMsMax) * time.Millisecond, 790 WaitRatioAvg: s.WaitRatioAvg, 791 WaitRatioMax: s.WaitRatioMax, 792 WriteAvg: time.Duration(s.WriteMsAvg) * time.Millisecond, 793 WriteMax: time.Duration(s.WriteMsMax) * time.Millisecond, 794 WriteRatioAvg: s.WriteRatioAvg, 795 WriteRatioMax: s.WriteRatioMax, 796 }) 797 } 798 return res 799} 800 801func timelineFromProto(timeline []*bq.QueryTimelineSample) []*QueryTimelineSample { 802 var res []*QueryTimelineSample 803 for _, s := range timeline { 804 res = append(res, &QueryTimelineSample{ 805 ActiveUnits: s.ActiveUnits, 806 CompletedUnits: s.CompletedUnits, 807 Elapsed: time.Duration(s.ElapsedMs) * time.Millisecond, 808 PendingUnits: s.PendingUnits, 809 SlotMillis: s.TotalSlotMs, 810 }) 811 } 812 return res 813} 814