1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "time" 22 23 "cloud.google.com/go/internal" 24 "cloud.google.com/go/internal/trace" 25 gax "github.com/googleapis/gax-go/v2" 26 bq "google.golang.org/api/bigquery/v2" 27 "google.golang.org/api/googleapi" 28 "google.golang.org/api/iterator" 29) 30 31// A Job represents an operation which has been submitted to BigQuery for processing. 32type Job struct { 33 c *Client 34 projectID string 35 jobID string 36 location string 37 email string 38 config *bq.JobConfiguration 39 lastStatus *JobStatus 40} 41 42// JobFromID creates a Job which refers to an existing BigQuery job. The job 43// need not have been created by this package. For example, the job may have 44// been created in the BigQuery console. 45// 46// For jobs whose location is other than "US" or "EU", set Client.Location or use 47// JobFromIDLocation. 48func (c *Client) JobFromID(ctx context.Context, id string) (*Job, error) { 49 return c.JobFromIDLocation(ctx, id, c.Location) 50} 51 52// JobFromIDLocation creates a Job which refers to an existing BigQuery job. The job 53// need not have been created by this package (for example, it may have 54// been created in the BigQuery console), but it must exist in the specified location. 55func (c *Client) JobFromIDLocation(ctx context.Context, id, location string) (j *Job, err error) { 56 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.JobFromIDLocation") 57 defer func() { trace.EndSpan(ctx, err) }() 58 59 bqjob, err := c.getJobInternal(ctx, id, location, "configuration", "jobReference", "status", "statistics") 60 if err != nil { 61 return nil, err 62 } 63 return bqToJob(bqjob, c) 64} 65 66// ID returns the job's ID. 67func (j *Job) ID() string { 68 return j.jobID 69} 70 71// Location returns the job's location. 72func (j *Job) Location() string { 73 return j.location 74} 75 76// Email returns the email of the job's creator. 77func (j *Job) Email() string { 78 return j.email 79} 80 81// State is one of a sequence of states that a Job progresses through as it is processed. 82type State int 83 84const ( 85 // StateUnspecified is the default JobIterator state. 86 StateUnspecified State = iota 87 // Pending is a state that describes that the job is pending. 88 Pending 89 // Running is a state that describes that the job is running. 90 Running 91 // Done is a state that describes that the job is done. 92 Done 93) 94 95// JobStatus contains the current State of a job, and errors encountered while processing that job. 96type JobStatus struct { 97 State State 98 99 err error 100 101 // All errors encountered during the running of the job. 102 // Not all Errors are fatal, so errors here do not necessarily mean that the job has completed or was unsuccessful. 103 Errors []*Error 104 105 // Statistics about the job. 106 Statistics *JobStatistics 107} 108 109// JobConfig contains configuration information for a job. It is implemented by 110// *CopyConfig, *ExtractConfig, *LoadConfig and *QueryConfig. 111type JobConfig interface { 112 isJobConfig() 113} 114 115func (*CopyConfig) isJobConfig() {} 116func (*ExtractConfig) isJobConfig() {} 117func (*LoadConfig) isJobConfig() {} 118func (*QueryConfig) isJobConfig() {} 119 120// Config returns the configuration information for j. 121func (j *Job) Config() (JobConfig, error) { 122 return bqToJobConfig(j.config, j.c) 123} 124 125// Children returns a job iterator for enumerating child jobs 126// of the current job. Currently only scripts, a form of query job, 127// will create child jobs. 128func (j *Job) Children(ctx context.Context) *JobIterator { 129 it := j.c.Jobs(ctx) 130 it.ParentJobID = j.ID() 131 return it 132} 133 134func bqToJobConfig(q *bq.JobConfiguration, c *Client) (JobConfig, error) { 135 switch { 136 case q == nil: 137 return nil, nil 138 case q.Copy != nil: 139 return bqToCopyConfig(q, c), nil 140 case q.Extract != nil: 141 return bqToExtractConfig(q, c), nil 142 case q.Load != nil: 143 return bqToLoadConfig(q, c), nil 144 case q.Query != nil: 145 return bqToQueryConfig(q, c) 146 default: 147 return nil, nil 148 } 149} 150 151// JobIDConfig describes how to create an ID for a job. 152type JobIDConfig struct { 153 // JobID is the ID to use for the job. If empty, a random job ID will be generated. 154 JobID string 155 156 // If AddJobIDSuffix is true, then a random string will be appended to JobID. 157 AddJobIDSuffix bool 158 159 // Location is the location for the job. 160 Location string 161} 162 163// createJobRef creates a JobReference. 164func (j *JobIDConfig) createJobRef(c *Client) *bq.JobReference { 165 // We don't check whether projectID is empty; the server will return an 166 // error when it encounters the resulting JobReference. 167 loc := j.Location 168 if loc == "" { // Use Client.Location as a default. 169 loc = c.Location 170 } 171 jr := &bq.JobReference{ProjectId: c.projectID, Location: loc} 172 if j.JobID == "" { 173 jr.JobId = randomIDFn() 174 } else if j.AddJobIDSuffix { 175 jr.JobId = j.JobID + "-" + randomIDFn() 176 } else { 177 jr.JobId = j.JobID 178 } 179 return jr 180} 181 182// Done reports whether the job has completed. 183// After Done returns true, the Err method will return an error if the job completed unsuccessfully. 184func (s *JobStatus) Done() bool { 185 return s.State == Done 186} 187 188// Err returns the error that caused the job to complete unsuccessfully (if any). 189func (s *JobStatus) Err() error { 190 return s.err 191} 192 193// Status retrieves the current status of the job from BigQuery. It fails if the Status could not be determined. 194func (j *Job) Status(ctx context.Context) (js *JobStatus, err error) { 195 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Status") 196 defer func() { trace.EndSpan(ctx, err) }() 197 198 bqjob, err := j.c.getJobInternal(ctx, j.jobID, j.location, "status", "statistics") 199 if err != nil { 200 return nil, err 201 } 202 if err := j.setStatus(bqjob.Status); err != nil { 203 return nil, err 204 } 205 j.setStatistics(bqjob.Statistics, j.c) 206 return j.lastStatus, nil 207} 208 209// LastStatus returns the most recently retrieved status of the job. The status is 210// retrieved when a new job is created, or when JobFromID or Job.Status is called. 211// Call Job.Status to get the most up-to-date information about a job. 212func (j *Job) LastStatus() *JobStatus { 213 return j.lastStatus 214} 215 216// Cancel requests that a job be cancelled. This method returns without waiting for 217// cancellation to take effect. To check whether the job has terminated, use Job.Status. 218// Cancelled jobs may still incur costs. 219func (j *Job) Cancel(ctx context.Context) error { 220 // Jobs.Cancel returns a job entity, but the only relevant piece of 221 // data it may contain (the status of the job) is unreliable. From the 222 // docs: "This call will return immediately, and the client will need 223 // to poll for the job status to see if the cancel completed 224 // successfully". So it would be misleading to return a status. 225 call := j.c.bqs.Jobs.Cancel(j.projectID, j.jobID). 226 Location(j.location). 227 Fields(). // We don't need any of the response data. 228 Context(ctx) 229 setClientHeader(call.Header()) 230 return runWithRetry(ctx, func() error { 231 _, err := call.Do() 232 return err 233 }) 234} 235 236// Wait blocks until the job or the context is done. It returns the final status 237// of the job. 238// If an error occurs while retrieving the status, Wait returns that error. But 239// Wait returns nil if the status was retrieved successfully, even if 240// status.Err() != nil. So callers must check both errors. See the example. 241func (j *Job) Wait(ctx context.Context) (js *JobStatus, err error) { 242 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Wait") 243 defer func() { trace.EndSpan(ctx, err) }() 244 245 if j.isQuery() { 246 // We can avoid polling for query jobs. 247 if _, _, err := j.waitForQuery(ctx, j.projectID); err != nil { 248 return nil, err 249 } 250 // Note: extra RPC even if you just want to wait for the query to finish. 251 js, err := j.Status(ctx) 252 if err != nil { 253 return nil, err 254 } 255 return js, nil 256 } 257 // Non-query jobs must poll. 258 err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) { 259 js, err = j.Status(ctx) 260 if err != nil { 261 return true, err 262 } 263 if js.Done() { 264 return true, nil 265 } 266 return false, nil 267 }) 268 if err != nil { 269 return nil, err 270 } 271 return js, nil 272} 273 274// Read fetches the results of a query job. 275// If j is not a query job, Read returns an error. 276func (j *Job) Read(ctx context.Context) (ri *RowIterator, err error) { 277 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Read") 278 defer func() { trace.EndSpan(ctx, err) }() 279 280 return j.read(ctx, j.waitForQuery, fetchPage) 281} 282 283func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, string) (Schema, uint64, error), pf pageFetcher) (*RowIterator, error) { 284 if !j.isQuery() { 285 return nil, errors.New("bigquery: cannot read from a non-query job") 286 } 287 destTable := j.config.Query.DestinationTable 288 // The destination table should only be nil if there was a query error. 289 projectID := j.projectID 290 if destTable != nil && projectID != destTable.ProjectId { 291 return nil, fmt.Errorf("bigquery: job project ID is %q, but destination table's is %q", projectID, destTable.ProjectId) 292 } 293 schema, totalRows, err := waitForQuery(ctx, projectID) 294 if err != nil { 295 return nil, err 296 } 297 if destTable == nil { 298 return nil, errors.New("bigquery: query job missing destination table") 299 } 300 dt := bqToTable(destTable, j.c) 301 it := newRowIterator(ctx, dt, pf) 302 it.Schema = schema 303 it.TotalRows = totalRows 304 return it, nil 305} 306 307// waitForQuery waits for the query job to complete and returns its schema. It also 308// returns the total number of rows in the result set. 309func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint64, error) { 310 // Use GetQueryResults only to wait for completion, not to read results. 311 call := j.c.bqs.Jobs.GetQueryResults(projectID, j.jobID).Location(j.location).Context(ctx).MaxResults(0) 312 setClientHeader(call.Header()) 313 backoff := gax.Backoff{ 314 Initial: 1 * time.Second, 315 Multiplier: 2, 316 Max: 60 * time.Second, 317 } 318 var res *bq.GetQueryResultsResponse 319 err := internal.Retry(ctx, backoff, func() (stop bool, err error) { 320 res, err = call.Do() 321 if err != nil { 322 return !retryableError(err), err 323 } 324 if !res.JobComplete { // GetQueryResults may return early without error; retry. 325 return false, nil 326 } 327 return true, nil 328 }) 329 if err != nil { 330 return nil, 0, err 331 } 332 return bqToSchema(res.Schema), res.TotalRows, nil 333} 334 335// JobStatistics contains statistics about a job. 336type JobStatistics struct { 337 CreationTime time.Time 338 StartTime time.Time 339 EndTime time.Time 340 TotalBytesProcessed int64 341 342 Details Statistics 343 344 // NumChildJobs indicates the number of child jobs run as part of a script. 345 NumChildJobs int64 346 347 // ParentJobID indicates the origin job for jobs run as part of a script. 348 ParentJobID string 349 350 // ScriptStatistics includes information run as part of a child job within 351 // a script. 352 ScriptStatistics *ScriptStatistics 353} 354 355// Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics. 356type Statistics interface { 357 implementsStatistics() 358} 359 360// ExtractStatistics contains statistics about an extract job. 361type ExtractStatistics struct { 362 // The number of files per destination URI or URI pattern specified in the 363 // extract configuration. These values will be in the same order as the 364 // URIs specified in the 'destinationUris' field. 365 DestinationURIFileCounts []int64 366} 367 368// LoadStatistics contains statistics about a load job. 369type LoadStatistics struct { 370 // The number of bytes of source data in a load job. 371 InputFileBytes int64 372 373 // The number of source files in a load job. 374 InputFiles int64 375 376 // Size of the loaded data in bytes. Note that while a load job is in the 377 // running state, this value may change. 378 OutputBytes int64 379 380 // The number of rows imported in a load job. Note that while an import job is 381 // in the running state, this value may change. 382 OutputRows int64 383} 384 385// QueryStatistics contains statistics about a query job. 386type QueryStatistics struct { 387 // Billing tier for the job. 388 BillingTier int64 389 390 // Whether the query result was fetched from the query cache. 391 CacheHit bool 392 393 // The type of query statement, if valid. 394 StatementType string 395 396 // Total bytes billed for the job. 397 TotalBytesBilled int64 398 399 // Total bytes processed for the job. 400 TotalBytesProcessed int64 401 402 // For dry run queries, indicates how accurate the TotalBytesProcessed value is. 403 // When indicated, values include: 404 // UNKNOWN: accuracy of the estimate is unknown. 405 // PRECISE: estimate is precise. 406 // LOWER_BOUND: estimate is lower bound of what the query would cost. 407 // UPPER_BOUND: estiamte is upper bound of what the query would cost. 408 TotalBytesProcessedAccuracy string 409 410 // Describes execution plan for the query. 411 QueryPlan []*ExplainQueryStage 412 413 // The number of rows affected by a DML statement. Present only for DML 414 // statements INSERT, UPDATE or DELETE. 415 NumDMLAffectedRows int64 416 417 // Describes a timeline of job execution. 418 Timeline []*QueryTimelineSample 419 420 // ReferencedTables: [Output-only] Referenced tables for 421 // the job. Queries that reference more than 50 tables will not have a 422 // complete list. 423 ReferencedTables []*Table 424 425 // The schema of the results. Present only for successful dry run of 426 // non-legacy SQL queries. 427 Schema Schema 428 429 // Slot-milliseconds consumed by this query job. 430 SlotMillis int64 431 432 // Standard SQL: list of undeclared query parameter names detected during a 433 // dry run validation. 434 UndeclaredQueryParameterNames []string 435 436 // DDL target table. 437 DDLTargetTable *Table 438 439 // DDL Operation performed on the target table. Used to report how the 440 // query impacted the DDL target table. 441 DDLOperationPerformed string 442 443 // The DDL target table, present only for CREATE/DROP FUNCTION/PROCEDURE queries. 444 DDLTargetRoutine *Routine 445} 446 447// ExplainQueryStage describes one stage of a query. 448type ExplainQueryStage struct { 449 // CompletedParallelInputs: Number of parallel input segments completed. 450 CompletedParallelInputs int64 451 452 // ComputeAvg: Duration the average shard spent on CPU-bound tasks. 453 ComputeAvg time.Duration 454 455 // ComputeMax: Duration the slowest shard spent on CPU-bound tasks. 456 ComputeMax time.Duration 457 458 // Relative amount of the total time the average shard spent on CPU-bound tasks. 459 ComputeRatioAvg float64 460 461 // Relative amount of the total time the slowest shard spent on CPU-bound tasks. 462 ComputeRatioMax float64 463 464 // EndTime: Stage end time. 465 EndTime time.Time 466 467 // Unique ID for stage within plan. 468 ID int64 469 470 // InputStages: IDs for stages that are inputs to this stage. 471 InputStages []int64 472 473 // Human-readable name for stage. 474 Name string 475 476 // ParallelInputs: Number of parallel input segments to be processed. 477 ParallelInputs int64 478 479 // ReadAvg: Duration the average shard spent reading input. 480 ReadAvg time.Duration 481 482 // ReadMax: Duration the slowest shard spent reading input. 483 ReadMax time.Duration 484 485 // Relative amount of the total time the average shard spent reading input. 486 ReadRatioAvg float64 487 488 // Relative amount of the total time the slowest shard spent reading input. 489 ReadRatioMax float64 490 491 // Number of records read into the stage. 492 RecordsRead int64 493 494 // Number of records written by the stage. 495 RecordsWritten int64 496 497 // ShuffleOutputBytes: Total number of bytes written to shuffle. 498 ShuffleOutputBytes int64 499 500 // ShuffleOutputBytesSpilled: Total number of bytes written to shuffle 501 // and spilled to disk. 502 ShuffleOutputBytesSpilled int64 503 504 // StartTime: Stage start time. 505 StartTime time.Time 506 507 // Current status for the stage. 508 Status string 509 510 // List of operations within the stage in dependency order (approximately 511 // chronological). 512 Steps []*ExplainQueryStep 513 514 // WaitAvg: Duration the average shard spent waiting to be scheduled. 515 WaitAvg time.Duration 516 517 // WaitMax: Duration the slowest shard spent waiting to be scheduled. 518 WaitMax time.Duration 519 520 // Relative amount of the total time the average shard spent waiting to be scheduled. 521 WaitRatioAvg float64 522 523 // Relative amount of the total time the slowest shard spent waiting to be scheduled. 524 WaitRatioMax float64 525 526 // WriteAvg: Duration the average shard spent on writing output. 527 WriteAvg time.Duration 528 529 // WriteMax: Duration the slowest shard spent on writing output. 530 WriteMax time.Duration 531 532 // Relative amount of the total time the average shard spent on writing output. 533 WriteRatioAvg float64 534 535 // Relative amount of the total time the slowest shard spent on writing output. 536 WriteRatioMax float64 537} 538 539// ExplainQueryStep describes one step of a query stage. 540type ExplainQueryStep struct { 541 // Machine-readable operation type. 542 Kind string 543 544 // Human-readable stage descriptions. 545 Substeps []string 546} 547 548// QueryTimelineSample represents a sample of execution statistics at a point in time. 549type QueryTimelineSample struct { 550 551 // Total number of units currently being processed by workers, represented as largest value since last sample. 552 ActiveUnits int64 553 554 // Total parallel units of work completed by this query. 555 CompletedUnits int64 556 557 // Time elapsed since start of query execution. 558 Elapsed time.Duration 559 560 // Total parallel units of work remaining for the active stages. 561 PendingUnits int64 562 563 // Cumulative slot-milliseconds consumed by the query. 564 SlotMillis int64 565} 566 567// ScriptStatistics report information about script-based query jobs. 568type ScriptStatistics struct { 569 EvaluationKind string 570 StackFrames []*ScriptStackFrame 571} 572 573func bqToScriptStatistics(bs *bq.ScriptStatistics) *ScriptStatistics { 574 if bs == nil { 575 return nil 576 } 577 ss := &ScriptStatistics{ 578 EvaluationKind: bs.EvaluationKind, 579 } 580 for _, f := range bs.StackFrames { 581 ss.StackFrames = append(ss.StackFrames, bqToScriptStackFrame(f)) 582 } 583 return ss 584} 585 586// ScriptStackFrame represents the location of the statement/expression being evaluated. 587// 588// Line and column numbers are defined as follows: 589// 590// - Line and column numbers start with one. That is, line 1 column 1 denotes 591// the start of the script. 592// - When inside a stored procedure, all line/column numbers are relative 593// to the procedure body, not the script in which the procedure was defined. 594// - Start/end positions exclude leading/trailing comments and whitespace. 595// The end position always ends with a ";", when present. 596// - Multi-byte Unicode characters are treated as just one column. 597// - If the original script (or procedure definition) contains TAB characters, 598// a tab "snaps" the indentation forward to the nearest multiple of 8 599// characters, plus 1. For example, a TAB on column 1, 2, 3, 4, 5, 6 , or 8 600// will advance the next character to column 9. A TAB on column 9, 10, 11, 601// 12, 13, 14, 15, or 16 will advance the next character to column 17. 602type ScriptStackFrame struct { 603 StartLine int64 604 StartColumn int64 605 EndLine int64 606 EndColumn int64 607 // Name of the active procedure. Empty if in a top-level script. 608 ProcedureID string 609 // Text of the current statement/expression. 610 Text string 611} 612 613func bqToScriptStackFrame(bsf *bq.ScriptStackFrame) *ScriptStackFrame { 614 if bsf == nil { 615 return nil 616 } 617 return &ScriptStackFrame{ 618 StartLine: bsf.StartLine, 619 StartColumn: bsf.StartColumn, 620 EndLine: bsf.EndLine, 621 EndColumn: bsf.EndColumn, 622 ProcedureID: bsf.ProcedureId, 623 Text: bsf.Text, 624 } 625} 626 627func (*ExtractStatistics) implementsStatistics() {} 628func (*LoadStatistics) implementsStatistics() {} 629func (*QueryStatistics) implementsStatistics() {} 630 631// Jobs lists jobs within a project. 632func (c *Client) Jobs(ctx context.Context) *JobIterator { 633 it := &JobIterator{ 634 ctx: ctx, 635 c: c, 636 ProjectID: c.projectID, 637 } 638 it.pageInfo, it.nextFunc = iterator.NewPageInfo( 639 it.fetch, 640 func() int { return len(it.items) }, 641 func() interface{} { b := it.items; it.items = nil; return b }) 642 return it 643} 644 645// JobIterator iterates over jobs in a project. 646type JobIterator struct { 647 ProjectID string // Project ID of the jobs to list. Default is the client's project. 648 AllUsers bool // Whether to list jobs owned by all users in the project, or just the current caller. 649 State State // List only jobs in the given state. Defaults to all states. 650 MinCreationTime time.Time // List only jobs created after this time. 651 MaxCreationTime time.Time // List only jobs created before this time. 652 ParentJobID string // List only jobs that are children of a given scripting job. 653 654 ctx context.Context 655 c *Client 656 pageInfo *iterator.PageInfo 657 nextFunc func() error 658 items []*Job 659} 660 661// PageInfo is a getter for the JobIterator's PageInfo. 662func (it *JobIterator) PageInfo() *iterator.PageInfo { return it.pageInfo } 663 664// Next returns the next Job. Its second return value is iterator.Done if 665// there are no more results. Once Next returns Done, all subsequent calls will 666// return Done. 667func (it *JobIterator) Next() (*Job, error) { 668 if err := it.nextFunc(); err != nil { 669 return nil, err 670 } 671 item := it.items[0] 672 it.items = it.items[1:] 673 return item, nil 674} 675 676func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) { 677 var st string 678 switch it.State { 679 case StateUnspecified: 680 st = "" 681 case Pending: 682 st = "pending" 683 case Running: 684 st = "running" 685 case Done: 686 st = "done" 687 default: 688 return "", fmt.Errorf("bigquery: invalid value for JobIterator.State: %d", it.State) 689 } 690 691 req := it.c.bqs.Jobs.List(it.ProjectID). 692 Context(it.ctx). 693 PageToken(pageToken). 694 Projection("full"). 695 AllUsers(it.AllUsers) 696 if st != "" { 697 req.StateFilter(st) 698 } 699 if !it.MinCreationTime.IsZero() { 700 req.MinCreationTime(uint64(it.MinCreationTime.UnixNano() / 1e6)) 701 } 702 if !it.MaxCreationTime.IsZero() { 703 req.MaxCreationTime(uint64(it.MaxCreationTime.UnixNano() / 1e6)) 704 } 705 setClientHeader(req.Header()) 706 if pageSize > 0 { 707 req.MaxResults(int64(pageSize)) 708 } 709 if it.ParentJobID != "" { 710 req.ParentJobId(it.ParentJobID) 711 } 712 res, err := req.Do() 713 if err != nil { 714 return "", err 715 } 716 for _, j := range res.Jobs { 717 job, err := convertListedJob(j, it.c) 718 if err != nil { 719 return "", err 720 } 721 it.items = append(it.items, job) 722 } 723 return res.NextPageToken, nil 724} 725 726func convertListedJob(j *bq.JobListJobs, c *Client) (*Job, error) { 727 return bqToJob2(j.JobReference, j.Configuration, j.Status, j.Statistics, j.UserEmail, c) 728} 729 730func (c *Client) getJobInternal(ctx context.Context, jobID, location string, fields ...googleapi.Field) (*bq.Job, error) { 731 var job *bq.Job 732 call := c.bqs.Jobs.Get(c.projectID, jobID).Context(ctx) 733 if location != "" { 734 call = call.Location(location) 735 } 736 if len(fields) > 0 { 737 call = call.Fields(fields...) 738 } 739 setClientHeader(call.Header()) 740 err := runWithRetry(ctx, func() (err error) { 741 job, err = call.Do() 742 return err 743 }) 744 if err != nil { 745 return nil, err 746 } 747 return job, nil 748} 749 750func bqToJob(q *bq.Job, c *Client) (*Job, error) { 751 return bqToJob2(q.JobReference, q.Configuration, q.Status, q.Statistics, q.UserEmail, c) 752} 753 754func bqToJob2(qr *bq.JobReference, qc *bq.JobConfiguration, qs *bq.JobStatus, qt *bq.JobStatistics, email string, c *Client) (*Job, error) { 755 j := &Job{ 756 projectID: qr.ProjectId, 757 jobID: qr.JobId, 758 location: qr.Location, 759 c: c, 760 email: email, 761 } 762 j.setConfig(qc) 763 if err := j.setStatus(qs); err != nil { 764 return nil, err 765 } 766 j.setStatistics(qt, c) 767 return j, nil 768} 769 770func (j *Job) setConfig(config *bq.JobConfiguration) { 771 if config == nil { 772 return 773 } 774 j.config = config 775} 776 777func (j *Job) isQuery() bool { 778 return j.config != nil && j.config.Query != nil 779} 780 781var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done} 782 783func (j *Job) setStatus(qs *bq.JobStatus) error { 784 if qs == nil { 785 return nil 786 } 787 state, ok := stateMap[qs.State] 788 if !ok { 789 return fmt.Errorf("unexpected job state: %v", qs.State) 790 } 791 j.lastStatus = &JobStatus{ 792 State: state, 793 err: nil, 794 } 795 if err := bqToError(qs.ErrorResult); state == Done && err != nil { 796 j.lastStatus.err = err 797 } 798 for _, ep := range qs.Errors { 799 j.lastStatus.Errors = append(j.lastStatus.Errors, bqToError(ep)) 800 } 801 return nil 802} 803 804func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) { 805 if s == nil || j.lastStatus == nil { 806 return 807 } 808 js := &JobStatistics{ 809 CreationTime: unixMillisToTime(s.CreationTime), 810 StartTime: unixMillisToTime(s.StartTime), 811 EndTime: unixMillisToTime(s.EndTime), 812 TotalBytesProcessed: s.TotalBytesProcessed, 813 NumChildJobs: s.NumChildJobs, 814 ParentJobID: s.ParentJobId, 815 ScriptStatistics: bqToScriptStatistics(s.ScriptStatistics), 816 } 817 switch { 818 case s.Extract != nil: 819 js.Details = &ExtractStatistics{ 820 DestinationURIFileCounts: []int64(s.Extract.DestinationUriFileCounts), 821 } 822 case s.Load != nil: 823 js.Details = &LoadStatistics{ 824 InputFileBytes: s.Load.InputFileBytes, 825 InputFiles: s.Load.InputFiles, 826 OutputBytes: s.Load.OutputBytes, 827 OutputRows: s.Load.OutputRows, 828 } 829 case s.Query != nil: 830 var names []string 831 for _, qp := range s.Query.UndeclaredQueryParameters { 832 names = append(names, qp.Name) 833 } 834 var tables []*Table 835 for _, tr := range s.Query.ReferencedTables { 836 tables = append(tables, bqToTable(tr, c)) 837 } 838 js.Details = &QueryStatistics{ 839 BillingTier: s.Query.BillingTier, 840 CacheHit: s.Query.CacheHit, 841 DDLTargetTable: bqToTable(s.Query.DdlTargetTable, c), 842 DDLOperationPerformed: s.Query.DdlOperationPerformed, 843 DDLTargetRoutine: bqToRoutine(s.Query.DdlTargetRoutine, c), 844 StatementType: s.Query.StatementType, 845 TotalBytesBilled: s.Query.TotalBytesBilled, 846 TotalBytesProcessed: s.Query.TotalBytesProcessed, 847 TotalBytesProcessedAccuracy: s.Query.TotalBytesProcessedAccuracy, 848 NumDMLAffectedRows: s.Query.NumDmlAffectedRows, 849 QueryPlan: queryPlanFromProto(s.Query.QueryPlan), 850 Schema: bqToSchema(s.Query.Schema), 851 SlotMillis: s.Query.TotalSlotMs, 852 Timeline: timelineFromProto(s.Query.Timeline), 853 ReferencedTables: tables, 854 UndeclaredQueryParameterNames: names, 855 } 856 } 857 j.lastStatus.Statistics = js 858} 859 860func queryPlanFromProto(stages []*bq.ExplainQueryStage) []*ExplainQueryStage { 861 var res []*ExplainQueryStage 862 for _, s := range stages { 863 var steps []*ExplainQueryStep 864 for _, p := range s.Steps { 865 steps = append(steps, &ExplainQueryStep{ 866 Kind: p.Kind, 867 Substeps: p.Substeps, 868 }) 869 } 870 res = append(res, &ExplainQueryStage{ 871 CompletedParallelInputs: s.CompletedParallelInputs, 872 ComputeAvg: time.Duration(s.ComputeMsAvg) * time.Millisecond, 873 ComputeMax: time.Duration(s.ComputeMsMax) * time.Millisecond, 874 ComputeRatioAvg: s.ComputeRatioAvg, 875 ComputeRatioMax: s.ComputeRatioMax, 876 EndTime: time.Unix(0, s.EndMs*1e6), 877 ID: s.Id, 878 InputStages: s.InputStages, 879 Name: s.Name, 880 ParallelInputs: s.ParallelInputs, 881 ReadAvg: time.Duration(s.ReadMsAvg) * time.Millisecond, 882 ReadMax: time.Duration(s.ReadMsMax) * time.Millisecond, 883 ReadRatioAvg: s.ReadRatioAvg, 884 ReadRatioMax: s.ReadRatioMax, 885 RecordsRead: s.RecordsRead, 886 RecordsWritten: s.RecordsWritten, 887 ShuffleOutputBytes: s.ShuffleOutputBytes, 888 ShuffleOutputBytesSpilled: s.ShuffleOutputBytesSpilled, 889 StartTime: time.Unix(0, s.StartMs*1e6), 890 Status: s.Status, 891 Steps: steps, 892 WaitAvg: time.Duration(s.WaitMsAvg) * time.Millisecond, 893 WaitMax: time.Duration(s.WaitMsMax) * time.Millisecond, 894 WaitRatioAvg: s.WaitRatioAvg, 895 WaitRatioMax: s.WaitRatioMax, 896 WriteAvg: time.Duration(s.WriteMsAvg) * time.Millisecond, 897 WriteMax: time.Duration(s.WriteMsMax) * time.Millisecond, 898 WriteRatioAvg: s.WriteRatioAvg, 899 WriteRatioMax: s.WriteRatioMax, 900 }) 901 } 902 return res 903} 904 905func timelineFromProto(timeline []*bq.QueryTimelineSample) []*QueryTimelineSample { 906 var res []*QueryTimelineSample 907 for _, s := range timeline { 908 res = append(res, &QueryTimelineSample{ 909 ActiveUnits: s.ActiveUnits, 910 CompletedUnits: s.CompletedUnits, 911 Elapsed: time.Duration(s.ElapsedMs) * time.Millisecond, 912 PendingUnits: s.PendingUnits, 913 SlotMillis: s.TotalSlotMs, 914 }) 915 } 916 return res 917} 918