1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "time" 22 23 "cloud.google.com/go/internal" 24 "cloud.google.com/go/internal/trace" 25 gax "github.com/googleapis/gax-go/v2" 26 bq "google.golang.org/api/bigquery/v2" 27 "google.golang.org/api/googleapi" 28 "google.golang.org/api/iterator" 29) 30 31// A Job represents an operation which has been submitted to BigQuery for processing. 32type Job struct { 33 c *Client 34 projectID string 35 jobID string 36 location string 37 email string 38 config *bq.JobConfiguration 39 lastStatus *JobStatus 40} 41 42// JobFromID creates a Job which refers to an existing BigQuery job. The job 43// need not have been created by this package. For example, the job may have 44// been created in the BigQuery console. 45// 46// For jobs whose location is other than "US" or "EU", set Client.Location or use 47// JobFromIDLocation. 48func (c *Client) JobFromID(ctx context.Context, id string) (*Job, error) { 49 return c.JobFromIDLocation(ctx, id, c.Location) 50} 51 52// JobFromIDLocation creates a Job which refers to an existing BigQuery job. The job 53// need not have been created by this package (for example, it may have 54// been created in the BigQuery console), but it must exist in the specified location. 55func (c *Client) JobFromIDLocation(ctx context.Context, id, location string) (j *Job, err error) { 56 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.JobFromIDLocation") 57 defer func() { trace.EndSpan(ctx, err) }() 58 59 bqjob, err := c.getJobInternal(ctx, id, location, "configuration", "jobReference", "status", "statistics") 60 if err != nil { 61 return nil, err 62 } 63 return bqToJob(bqjob, c) 64} 65 66// ProjectID returns the job's associated project. 67func (j *Job) ProjectID() string { 68 return j.projectID 69} 70 71// ID returns the job's ID. 72func (j *Job) ID() string { 73 return j.jobID 74} 75 76// Location returns the job's location. 77func (j *Job) Location() string { 78 return j.location 79} 80 81// Email returns the email of the job's creator. 82func (j *Job) Email() string { 83 return j.email 84} 85 86// State is one of a sequence of states that a Job progresses through as it is processed. 87type State int 88 89const ( 90 // StateUnspecified is the default JobIterator state. 91 StateUnspecified State = iota 92 // Pending is a state that describes that the job is pending. 93 Pending 94 // Running is a state that describes that the job is running. 95 Running 96 // Done is a state that describes that the job is done. 97 Done 98) 99 100// JobStatus contains the current State of a job, and errors encountered while processing that job. 101type JobStatus struct { 102 State State 103 104 err error 105 106 // All errors encountered during the running of the job. 107 // Not all Errors are fatal, so errors here do not necessarily mean that the job has completed or was unsuccessful. 108 Errors []*Error 109 110 // Statistics about the job. 111 Statistics *JobStatistics 112} 113 114// JobConfig contains configuration information for a job. It is implemented by 115// *CopyConfig, *ExtractConfig, *LoadConfig and *QueryConfig. 116type JobConfig interface { 117 isJobConfig() 118} 119 120func (*CopyConfig) isJobConfig() {} 121func (*ExtractConfig) isJobConfig() {} 122func (*LoadConfig) isJobConfig() {} 123func (*QueryConfig) isJobConfig() {} 124 125// Config returns the configuration information for j. 126func (j *Job) Config() (JobConfig, error) { 127 return bqToJobConfig(j.config, j.c) 128} 129 130// Children returns a job iterator for enumerating child jobs 131// of the current job. Currently only scripts, a form of query job, 132// will create child jobs. 133func (j *Job) Children(ctx context.Context) *JobIterator { 134 it := j.c.Jobs(ctx) 135 it.ParentJobID = j.ID() 136 return it 137} 138 139func bqToJobConfig(q *bq.JobConfiguration, c *Client) (JobConfig, error) { 140 switch { 141 case q == nil: 142 return nil, nil 143 case q.Copy != nil: 144 return bqToCopyConfig(q, c), nil 145 case q.Extract != nil: 146 return bqToExtractConfig(q, c), nil 147 case q.Load != nil: 148 return bqToLoadConfig(q, c), nil 149 case q.Query != nil: 150 return bqToQueryConfig(q, c) 151 default: 152 return nil, nil 153 } 154} 155 156// JobIDConfig describes how to create an ID for a job. 157type JobIDConfig struct { 158 // JobID is the ID to use for the job. If empty, a random job ID will be generated. 159 JobID string 160 161 // If AddJobIDSuffix is true, then a random string will be appended to JobID. 162 AddJobIDSuffix bool 163 164 // Location is the location for the job. 165 Location string 166} 167 168// createJobRef creates a JobReference. 169func (j *JobIDConfig) createJobRef(c *Client) *bq.JobReference { 170 // We don't check whether projectID is empty; the server will return an 171 // error when it encounters the resulting JobReference. 172 loc := j.Location 173 if loc == "" { // Use Client.Location as a default. 174 loc = c.Location 175 } 176 jr := &bq.JobReference{ProjectId: c.projectID, Location: loc} 177 if j.JobID == "" { 178 jr.JobId = randomIDFn() 179 } else if j.AddJobIDSuffix { 180 jr.JobId = j.JobID + "-" + randomIDFn() 181 } else { 182 jr.JobId = j.JobID 183 } 184 return jr 185} 186 187// Done reports whether the job has completed. 188// After Done returns true, the Err method will return an error if the job completed unsuccessfully. 189func (s *JobStatus) Done() bool { 190 return s.State == Done 191} 192 193// Err returns the error that caused the job to complete unsuccessfully (if any). 194func (s *JobStatus) Err() error { 195 return s.err 196} 197 198// Status retrieves the current status of the job from BigQuery. It fails if the Status could not be determined. 199func (j *Job) Status(ctx context.Context) (js *JobStatus, err error) { 200 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Status") 201 defer func() { trace.EndSpan(ctx, err) }() 202 203 bqjob, err := j.c.getJobInternal(ctx, j.jobID, j.location, "status", "statistics") 204 if err != nil { 205 return nil, err 206 } 207 if err := j.setStatus(bqjob.Status); err != nil { 208 return nil, err 209 } 210 j.setStatistics(bqjob.Statistics, j.c) 211 return j.lastStatus, nil 212} 213 214// LastStatus returns the most recently retrieved status of the job. The status is 215// retrieved when a new job is created, or when JobFromID or Job.Status is called. 216// Call Job.Status to get the most up-to-date information about a job. 217func (j *Job) LastStatus() *JobStatus { 218 return j.lastStatus 219} 220 221// Cancel requests that a job be cancelled. This method returns without waiting for 222// cancellation to take effect. To check whether the job has terminated, use Job.Status. 223// Cancelled jobs may still incur costs. 224func (j *Job) Cancel(ctx context.Context) error { 225 // Jobs.Cancel returns a job entity, but the only relevant piece of 226 // data it may contain (the status of the job) is unreliable. From the 227 // docs: "This call will return immediately, and the client will need 228 // to poll for the job status to see if the cancel completed 229 // successfully". So it would be misleading to return a status. 230 call := j.c.bqs.Jobs.Cancel(j.projectID, j.jobID). 231 Location(j.location). 232 Fields(). // We don't need any of the response data. 233 Context(ctx) 234 setClientHeader(call.Header()) 235 return runWithRetry(ctx, func() error { 236 _, err := call.Do() 237 return err 238 }) 239} 240 241// Delete deletes the job. 242func (j *Job) Delete(ctx context.Context) (err error) { 243 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Delete") 244 defer func() { trace.EndSpan(ctx, err) }() 245 246 call := j.c.bqs.Jobs.Delete(j.projectID, j.jobID).Context(ctx) 247 if j.location != "" { 248 call = call.Location(j.location) 249 } 250 setClientHeader(call.Header()) 251 252 return runWithRetry(ctx, func() (err error) { 253 err = call.Do() 254 return err 255 }) 256} 257 258// Wait blocks until the job or the context is done. It returns the final status 259// of the job. 260// If an error occurs while retrieving the status, Wait returns that error. But 261// Wait returns nil if the status was retrieved successfully, even if 262// status.Err() != nil. So callers must check both errors. See the example. 263func (j *Job) Wait(ctx context.Context) (js *JobStatus, err error) { 264 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Wait") 265 defer func() { trace.EndSpan(ctx, err) }() 266 267 if j.isQuery() { 268 // We can avoid polling for query jobs. 269 if _, _, err := j.waitForQuery(ctx, j.projectID); err != nil { 270 return nil, err 271 } 272 // Note: extra RPC even if you just want to wait for the query to finish. 273 js, err := j.Status(ctx) 274 if err != nil { 275 return nil, err 276 } 277 return js, nil 278 } 279 // Non-query jobs must poll. 280 err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) { 281 js, err = j.Status(ctx) 282 if err != nil { 283 return true, err 284 } 285 if js.Done() { 286 return true, nil 287 } 288 return false, nil 289 }) 290 if err != nil { 291 return nil, err 292 } 293 return js, nil 294} 295 296// Read fetches the results of a query job. 297// If j is not a query job, Read returns an error. 298func (j *Job) Read(ctx context.Context) (ri *RowIterator, err error) { 299 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Read") 300 defer func() { trace.EndSpan(ctx, err) }() 301 302 return j.read(ctx, j.waitForQuery, fetchPage) 303} 304 305func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, string) (Schema, uint64, error), pf pageFetcher) (*RowIterator, error) { 306 if !j.isQuery() { 307 return nil, errors.New("bigquery: cannot read from a non-query job") 308 } 309 schema, totalRows, err := waitForQuery(ctx, j.projectID) 310 if err != nil { 311 return nil, err 312 } 313 // Shave off some potential overhead by only retaining the minimal job representation in the iterator. 314 itJob := &Job{ 315 c: j.c, 316 projectID: j.projectID, 317 jobID: j.jobID, 318 location: j.location, 319 } 320 it := newRowIterator(ctx, &rowSource{j: itJob}, pf) 321 it.Schema = schema 322 it.TotalRows = totalRows 323 return it, nil 324} 325 326// waitForQuery waits for the query job to complete and returns its schema. It also 327// returns the total number of rows in the result set. 328func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint64, error) { 329 // Use GetQueryResults only to wait for completion, not to read results. 330 call := j.c.bqs.Jobs.GetQueryResults(projectID, j.jobID).Location(j.location).Context(ctx).MaxResults(0) 331 setClientHeader(call.Header()) 332 backoff := gax.Backoff{ 333 Initial: 1 * time.Second, 334 Multiplier: 2, 335 Max: 60 * time.Second, 336 } 337 var res *bq.GetQueryResultsResponse 338 err := internal.Retry(ctx, backoff, func() (stop bool, err error) { 339 res, err = call.Do() 340 if err != nil { 341 return !retryableError(err), err 342 } 343 if !res.JobComplete { // GetQueryResults may return early without error; retry. 344 return false, nil 345 } 346 return true, nil 347 }) 348 if err != nil { 349 return nil, 0, err 350 } 351 return bqToSchema(res.Schema), res.TotalRows, nil 352} 353 354// JobStatistics contains statistics about a job. 355type JobStatistics struct { 356 CreationTime time.Time 357 StartTime time.Time 358 EndTime time.Time 359 TotalBytesProcessed int64 360 361 Details Statistics 362 363 // NumChildJobs indicates the number of child jobs run as part of a script. 364 NumChildJobs int64 365 366 // ParentJobID indicates the origin job for jobs run as part of a script. 367 ParentJobID string 368 369 // ScriptStatistics includes information run as part of a child job within 370 // a script. 371 ScriptStatistics *ScriptStatistics 372 373 // ReservationUsage attributes slot consumption to reservations. 374 ReservationUsage []*ReservationUsage 375 376 // TransactionInfo indicates the transaction ID associated with the job, if any. 377 TransactionInfo *TransactionInfo 378} 379 380// Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics. 381type Statistics interface { 382 implementsStatistics() 383} 384 385// ExtractStatistics contains statistics about an extract job. 386type ExtractStatistics struct { 387 // The number of files per destination URI or URI pattern specified in the 388 // extract configuration. These values will be in the same order as the 389 // URIs specified in the 'destinationUris' field. 390 DestinationURIFileCounts []int64 391} 392 393// LoadStatistics contains statistics about a load job. 394type LoadStatistics struct { 395 // The number of bytes of source data in a load job. 396 InputFileBytes int64 397 398 // The number of source files in a load job. 399 InputFiles int64 400 401 // Size of the loaded data in bytes. Note that while a load job is in the 402 // running state, this value may change. 403 OutputBytes int64 404 405 // The number of rows imported in a load job. Note that while an import job is 406 // in the running state, this value may change. 407 OutputRows int64 408} 409 410// QueryStatistics contains statistics about a query job. 411type QueryStatistics struct { 412 // Billing tier for the job. 413 BillingTier int64 414 415 // Whether the query result was fetched from the query cache. 416 CacheHit bool 417 418 // The type of query statement, if valid. 419 StatementType string 420 421 // Total bytes billed for the job. 422 TotalBytesBilled int64 423 424 // Total bytes processed for the job. 425 TotalBytesProcessed int64 426 427 // For dry run queries, indicates how accurate the TotalBytesProcessed value is. 428 // When indicated, values include: 429 // UNKNOWN: accuracy of the estimate is unknown. 430 // PRECISE: estimate is precise. 431 // LOWER_BOUND: estimate is lower bound of what the query would cost. 432 // UPPER_BOUND: estimate is upper bound of what the query would cost. 433 TotalBytesProcessedAccuracy string 434 435 // Describes execution plan for the query. 436 QueryPlan []*ExplainQueryStage 437 438 // The number of rows affected by a DML statement. Present only for DML 439 // statements INSERT, UPDATE or DELETE. 440 NumDMLAffectedRows int64 441 442 // DMLStats provides statistics about the row mutations performed by 443 // DML statements. 444 DMLStats *DMLStatistics 445 446 // Describes a timeline of job execution. 447 Timeline []*QueryTimelineSample 448 449 // ReferencedTables: [Output-only] Referenced tables for 450 // the job. Queries that reference more than 50 tables will not have a 451 // complete list. 452 ReferencedTables []*Table 453 454 // The schema of the results. Present only for successful dry run of 455 // non-legacy SQL queries. 456 Schema Schema 457 458 // Slot-milliseconds consumed by this query job. 459 SlotMillis int64 460 461 // Standard SQL: list of undeclared query parameter names detected during a 462 // dry run validation. 463 UndeclaredQueryParameterNames []string 464 465 // DDL target table. 466 DDLTargetTable *Table 467 468 // DDL Operation performed on the target table. Used to report how the 469 // query impacted the DDL target table. 470 DDLOperationPerformed string 471 472 // The DDL target table, present only for CREATE/DROP FUNCTION/PROCEDURE queries. 473 DDLTargetRoutine *Routine 474} 475 476// ExplainQueryStage describes one stage of a query. 477type ExplainQueryStage struct { 478 // CompletedParallelInputs: Number of parallel input segments completed. 479 CompletedParallelInputs int64 480 481 // ComputeAvg: Duration the average shard spent on CPU-bound tasks. 482 ComputeAvg time.Duration 483 484 // ComputeMax: Duration the slowest shard spent on CPU-bound tasks. 485 ComputeMax time.Duration 486 487 // Relative amount of the total time the average shard spent on CPU-bound tasks. 488 ComputeRatioAvg float64 489 490 // Relative amount of the total time the slowest shard spent on CPU-bound tasks. 491 ComputeRatioMax float64 492 493 // EndTime: Stage end time. 494 EndTime time.Time 495 496 // Unique ID for stage within plan. 497 ID int64 498 499 // InputStages: IDs for stages that are inputs to this stage. 500 InputStages []int64 501 502 // Human-readable name for stage. 503 Name string 504 505 // ParallelInputs: Number of parallel input segments to be processed. 506 ParallelInputs int64 507 508 // ReadAvg: Duration the average shard spent reading input. 509 ReadAvg time.Duration 510 511 // ReadMax: Duration the slowest shard spent reading input. 512 ReadMax time.Duration 513 514 // Relative amount of the total time the average shard spent reading input. 515 ReadRatioAvg float64 516 517 // Relative amount of the total time the slowest shard spent reading input. 518 ReadRatioMax float64 519 520 // Number of records read into the stage. 521 RecordsRead int64 522 523 // Number of records written by the stage. 524 RecordsWritten int64 525 526 // ShuffleOutputBytes: Total number of bytes written to shuffle. 527 ShuffleOutputBytes int64 528 529 // ShuffleOutputBytesSpilled: Total number of bytes written to shuffle 530 // and spilled to disk. 531 ShuffleOutputBytesSpilled int64 532 533 // StartTime: Stage start time. 534 StartTime time.Time 535 536 // Current status for the stage. 537 Status string 538 539 // List of operations within the stage in dependency order (approximately 540 // chronological). 541 Steps []*ExplainQueryStep 542 543 // WaitAvg: Duration the average shard spent waiting to be scheduled. 544 WaitAvg time.Duration 545 546 // WaitMax: Duration the slowest shard spent waiting to be scheduled. 547 WaitMax time.Duration 548 549 // Relative amount of the total time the average shard spent waiting to be scheduled. 550 WaitRatioAvg float64 551 552 // Relative amount of the total time the slowest shard spent waiting to be scheduled. 553 WaitRatioMax float64 554 555 // WriteAvg: Duration the average shard spent on writing output. 556 WriteAvg time.Duration 557 558 // WriteMax: Duration the slowest shard spent on writing output. 559 WriteMax time.Duration 560 561 // Relative amount of the total time the average shard spent on writing output. 562 WriteRatioAvg float64 563 564 // Relative amount of the total time the slowest shard spent on writing output. 565 WriteRatioMax float64 566} 567 568// ExplainQueryStep describes one step of a query stage. 569type ExplainQueryStep struct { 570 // Machine-readable operation type. 571 Kind string 572 573 // Human-readable stage descriptions. 574 Substeps []string 575} 576 577// QueryTimelineSample represents a sample of execution statistics at a point in time. 578type QueryTimelineSample struct { 579 580 // Total number of units currently being processed by workers, represented as largest value since last sample. 581 ActiveUnits int64 582 583 // Total parallel units of work completed by this query. 584 CompletedUnits int64 585 586 // Time elapsed since start of query execution. 587 Elapsed time.Duration 588 589 // Total parallel units of work remaining for the active stages. 590 PendingUnits int64 591 592 // Cumulative slot-milliseconds consumed by the query. 593 SlotMillis int64 594} 595 596// ReservationUsage contains information about a job's usage of a single reservation. 597type ReservationUsage struct { 598 // SlotMillis reports the slot milliseconds utilized within in the given reservation. 599 SlotMillis int64 600 // Name indicates the utilized reservation name, or "unreserved" for ondemand usage. 601 Name string 602} 603 604func bqToReservationUsage(ru []*bq.JobStatisticsReservationUsage) []*ReservationUsage { 605 var usage []*ReservationUsage 606 for _, in := range ru { 607 usage = append(usage, &ReservationUsage{ 608 SlotMillis: in.SlotMs, 609 Name: in.Name, 610 }) 611 } 612 return usage 613} 614 615// ScriptStatistics report information about script-based query jobs. 616type ScriptStatistics struct { 617 EvaluationKind string 618 StackFrames []*ScriptStackFrame 619} 620 621func bqToScriptStatistics(bs *bq.ScriptStatistics) *ScriptStatistics { 622 if bs == nil { 623 return nil 624 } 625 ss := &ScriptStatistics{ 626 EvaluationKind: bs.EvaluationKind, 627 } 628 for _, f := range bs.StackFrames { 629 ss.StackFrames = append(ss.StackFrames, bqToScriptStackFrame(f)) 630 } 631 return ss 632} 633 634// ScriptStackFrame represents the location of the statement/expression being evaluated. 635// 636// Line and column numbers are defined as follows: 637// 638// - Line and column numbers start with one. That is, line 1 column 1 denotes 639// the start of the script. 640// - When inside a stored procedure, all line/column numbers are relative 641// to the procedure body, not the script in which the procedure was defined. 642// - Start/end positions exclude leading/trailing comments and whitespace. 643// The end position always ends with a ";", when present. 644// - Multi-byte Unicode characters are treated as just one column. 645// - If the original script (or procedure definition) contains TAB characters, 646// a tab "snaps" the indentation forward to the nearest multiple of 8 647// characters, plus 1. For example, a TAB on column 1, 2, 3, 4, 5, 6 , or 8 648// will advance the next character to column 9. A TAB on column 9, 10, 11, 649// 12, 13, 14, 15, or 16 will advance the next character to column 17. 650type ScriptStackFrame struct { 651 StartLine int64 652 StartColumn int64 653 EndLine int64 654 EndColumn int64 655 // Name of the active procedure. Empty if in a top-level script. 656 ProcedureID string 657 // Text of the current statement/expression. 658 Text string 659} 660 661func bqToScriptStackFrame(bsf *bq.ScriptStackFrame) *ScriptStackFrame { 662 if bsf == nil { 663 return nil 664 } 665 return &ScriptStackFrame{ 666 StartLine: bsf.StartLine, 667 StartColumn: bsf.StartColumn, 668 EndLine: bsf.EndLine, 669 EndColumn: bsf.EndColumn, 670 ProcedureID: bsf.ProcedureId, 671 Text: bsf.Text, 672 } 673} 674 675// DMLStatistics contains counts of row mutations triggered by a DML query statement. 676type DMLStatistics struct { 677 // Rows added by the statement. 678 InsertedRowCount int64 679 // Rows removed by the statement. 680 DeletedRowCount int64 681 // Rows changed by the statement. 682 UpdatedRowCount int64 683} 684 685func bqToDMLStatistics(q *bq.DmlStatistics) *DMLStatistics { 686 if q == nil { 687 return nil 688 } 689 return &DMLStatistics{ 690 InsertedRowCount: q.InsertedRowCount, 691 DeletedRowCount: q.DeletedRowCount, 692 UpdatedRowCount: q.UpdatedRowCount, 693 } 694} 695 696func (*ExtractStatistics) implementsStatistics() {} 697func (*LoadStatistics) implementsStatistics() {} 698func (*QueryStatistics) implementsStatistics() {} 699 700// Jobs lists jobs within a project. 701func (c *Client) Jobs(ctx context.Context) *JobIterator { 702 it := &JobIterator{ 703 ctx: ctx, 704 c: c, 705 ProjectID: c.projectID, 706 } 707 it.pageInfo, it.nextFunc = iterator.NewPageInfo( 708 it.fetch, 709 func() int { return len(it.items) }, 710 func() interface{} { b := it.items; it.items = nil; return b }) 711 return it 712} 713 714// JobIterator iterates over jobs in a project. 715type JobIterator struct { 716 ProjectID string // Project ID of the jobs to list. Default is the client's project. 717 AllUsers bool // Whether to list jobs owned by all users in the project, or just the current caller. 718 State State // List only jobs in the given state. Defaults to all states. 719 MinCreationTime time.Time // List only jobs created after this time. 720 MaxCreationTime time.Time // List only jobs created before this time. 721 ParentJobID string // List only jobs that are children of a given scripting job. 722 723 ctx context.Context 724 c *Client 725 pageInfo *iterator.PageInfo 726 nextFunc func() error 727 items []*Job 728} 729 730// PageInfo is a getter for the JobIterator's PageInfo. 731func (it *JobIterator) PageInfo() *iterator.PageInfo { return it.pageInfo } 732 733// Next returns the next Job. Its second return value is iterator.Done if 734// there are no more results. Once Next returns Done, all subsequent calls will 735// return Done. 736func (it *JobIterator) Next() (*Job, error) { 737 if err := it.nextFunc(); err != nil { 738 return nil, err 739 } 740 item := it.items[0] 741 it.items = it.items[1:] 742 return item, nil 743} 744 745func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) { 746 var st string 747 switch it.State { 748 case StateUnspecified: 749 st = "" 750 case Pending: 751 st = "pending" 752 case Running: 753 st = "running" 754 case Done: 755 st = "done" 756 default: 757 return "", fmt.Errorf("bigquery: invalid value for JobIterator.State: %d", it.State) 758 } 759 760 req := it.c.bqs.Jobs.List(it.ProjectID). 761 Context(it.ctx). 762 PageToken(pageToken). 763 Projection("full"). 764 AllUsers(it.AllUsers) 765 if st != "" { 766 req.StateFilter(st) 767 } 768 if !it.MinCreationTime.IsZero() { 769 req.MinCreationTime(uint64(it.MinCreationTime.UnixNano() / 1e6)) 770 } 771 if !it.MaxCreationTime.IsZero() { 772 req.MaxCreationTime(uint64(it.MaxCreationTime.UnixNano() / 1e6)) 773 } 774 setClientHeader(req.Header()) 775 if pageSize > 0 { 776 req.MaxResults(int64(pageSize)) 777 } 778 if it.ParentJobID != "" { 779 req.ParentJobId(it.ParentJobID) 780 } 781 res, err := req.Do() 782 if err != nil { 783 return "", err 784 } 785 for _, j := range res.Jobs { 786 job, err := convertListedJob(j, it.c) 787 if err != nil { 788 return "", err 789 } 790 it.items = append(it.items, job) 791 } 792 return res.NextPageToken, nil 793} 794 795func convertListedJob(j *bq.JobListJobs, c *Client) (*Job, error) { 796 return bqToJob2(j.JobReference, j.Configuration, j.Status, j.Statistics, j.UserEmail, c) 797} 798 799func (c *Client) getJobInternal(ctx context.Context, jobID, location string, fields ...googleapi.Field) (*bq.Job, error) { 800 var job *bq.Job 801 call := c.bqs.Jobs.Get(c.projectID, jobID).Context(ctx) 802 if location != "" { 803 call = call.Location(location) 804 } 805 if len(fields) > 0 { 806 call = call.Fields(fields...) 807 } 808 setClientHeader(call.Header()) 809 err := runWithRetry(ctx, func() (err error) { 810 job, err = call.Do() 811 return err 812 }) 813 if err != nil { 814 return nil, err 815 } 816 return job, nil 817} 818 819func bqToJob(q *bq.Job, c *Client) (*Job, error) { 820 return bqToJob2(q.JobReference, q.Configuration, q.Status, q.Statistics, q.UserEmail, c) 821} 822 823func bqToJob2(qr *bq.JobReference, qc *bq.JobConfiguration, qs *bq.JobStatus, qt *bq.JobStatistics, email string, c *Client) (*Job, error) { 824 j := &Job{ 825 projectID: qr.ProjectId, 826 jobID: qr.JobId, 827 location: qr.Location, 828 c: c, 829 email: email, 830 } 831 j.setConfig(qc) 832 if err := j.setStatus(qs); err != nil { 833 return nil, err 834 } 835 j.setStatistics(qt, c) 836 return j, nil 837} 838 839func (j *Job) setConfig(config *bq.JobConfiguration) { 840 if config == nil { 841 return 842 } 843 j.config = config 844} 845 846func (j *Job) isQuery() bool { 847 return j.config != nil && j.config.Query != nil 848} 849 850var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done} 851 852func (j *Job) setStatus(qs *bq.JobStatus) error { 853 if qs == nil { 854 return nil 855 } 856 state, ok := stateMap[qs.State] 857 if !ok { 858 return fmt.Errorf("unexpected job state: %v", qs.State) 859 } 860 j.lastStatus = &JobStatus{ 861 State: state, 862 err: nil, 863 } 864 if err := bqToError(qs.ErrorResult); state == Done && err != nil { 865 j.lastStatus.err = err 866 } 867 for _, ep := range qs.Errors { 868 j.lastStatus.Errors = append(j.lastStatus.Errors, bqToError(ep)) 869 } 870 return nil 871} 872 873func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) { 874 if s == nil || j.lastStatus == nil { 875 return 876 } 877 js := &JobStatistics{ 878 CreationTime: unixMillisToTime(s.CreationTime), 879 StartTime: unixMillisToTime(s.StartTime), 880 EndTime: unixMillisToTime(s.EndTime), 881 TotalBytesProcessed: s.TotalBytesProcessed, 882 NumChildJobs: s.NumChildJobs, 883 ParentJobID: s.ParentJobId, 884 ScriptStatistics: bqToScriptStatistics(s.ScriptStatistics), 885 ReservationUsage: bqToReservationUsage(s.ReservationUsage), 886 TransactionInfo: bqToTransactionInfo(s.TransactionInfo), 887 } 888 switch { 889 case s.Extract != nil: 890 js.Details = &ExtractStatistics{ 891 DestinationURIFileCounts: []int64(s.Extract.DestinationUriFileCounts), 892 } 893 case s.Load != nil: 894 js.Details = &LoadStatistics{ 895 InputFileBytes: s.Load.InputFileBytes, 896 InputFiles: s.Load.InputFiles, 897 OutputBytes: s.Load.OutputBytes, 898 OutputRows: s.Load.OutputRows, 899 } 900 case s.Query != nil: 901 var names []string 902 for _, qp := range s.Query.UndeclaredQueryParameters { 903 names = append(names, qp.Name) 904 } 905 var tables []*Table 906 for _, tr := range s.Query.ReferencedTables { 907 tables = append(tables, bqToTable(tr, c)) 908 } 909 js.Details = &QueryStatistics{ 910 BillingTier: s.Query.BillingTier, 911 CacheHit: s.Query.CacheHit, 912 DDLTargetTable: bqToTable(s.Query.DdlTargetTable, c), 913 DDLOperationPerformed: s.Query.DdlOperationPerformed, 914 DDLTargetRoutine: bqToRoutine(s.Query.DdlTargetRoutine, c), 915 StatementType: s.Query.StatementType, 916 TotalBytesBilled: s.Query.TotalBytesBilled, 917 TotalBytesProcessed: s.Query.TotalBytesProcessed, 918 TotalBytesProcessedAccuracy: s.Query.TotalBytesProcessedAccuracy, 919 NumDMLAffectedRows: s.Query.NumDmlAffectedRows, 920 DMLStats: bqToDMLStatistics(s.Query.DmlStats), 921 QueryPlan: queryPlanFromProto(s.Query.QueryPlan), 922 Schema: bqToSchema(s.Query.Schema), 923 SlotMillis: s.Query.TotalSlotMs, 924 Timeline: timelineFromProto(s.Query.Timeline), 925 ReferencedTables: tables, 926 UndeclaredQueryParameterNames: names, 927 } 928 } 929 j.lastStatus.Statistics = js 930} 931 932func queryPlanFromProto(stages []*bq.ExplainQueryStage) []*ExplainQueryStage { 933 var res []*ExplainQueryStage 934 for _, s := range stages { 935 var steps []*ExplainQueryStep 936 for _, p := range s.Steps { 937 steps = append(steps, &ExplainQueryStep{ 938 Kind: p.Kind, 939 Substeps: p.Substeps, 940 }) 941 } 942 res = append(res, &ExplainQueryStage{ 943 CompletedParallelInputs: s.CompletedParallelInputs, 944 ComputeAvg: time.Duration(s.ComputeMsAvg) * time.Millisecond, 945 ComputeMax: time.Duration(s.ComputeMsMax) * time.Millisecond, 946 ComputeRatioAvg: s.ComputeRatioAvg, 947 ComputeRatioMax: s.ComputeRatioMax, 948 EndTime: time.Unix(0, s.EndMs*1e6), 949 ID: s.Id, 950 InputStages: s.InputStages, 951 Name: s.Name, 952 ParallelInputs: s.ParallelInputs, 953 ReadAvg: time.Duration(s.ReadMsAvg) * time.Millisecond, 954 ReadMax: time.Duration(s.ReadMsMax) * time.Millisecond, 955 ReadRatioAvg: s.ReadRatioAvg, 956 ReadRatioMax: s.ReadRatioMax, 957 RecordsRead: s.RecordsRead, 958 RecordsWritten: s.RecordsWritten, 959 ShuffleOutputBytes: s.ShuffleOutputBytes, 960 ShuffleOutputBytesSpilled: s.ShuffleOutputBytesSpilled, 961 StartTime: time.Unix(0, s.StartMs*1e6), 962 Status: s.Status, 963 Steps: steps, 964 WaitAvg: time.Duration(s.WaitMsAvg) * time.Millisecond, 965 WaitMax: time.Duration(s.WaitMsMax) * time.Millisecond, 966 WaitRatioAvg: s.WaitRatioAvg, 967 WaitRatioMax: s.WaitRatioMax, 968 WriteAvg: time.Duration(s.WriteMsAvg) * time.Millisecond, 969 WriteMax: time.Duration(s.WriteMsMax) * time.Millisecond, 970 WriteRatioAvg: s.WriteRatioAvg, 971 WriteRatioMax: s.WriteRatioMax, 972 }) 973 } 974 return res 975} 976 977func timelineFromProto(timeline []*bq.QueryTimelineSample) []*QueryTimelineSample { 978 var res []*QueryTimelineSample 979 for _, s := range timeline { 980 res = append(res, &QueryTimelineSample{ 981 ActiveUnits: s.ActiveUnits, 982 CompletedUnits: s.CompletedUnits, 983 Elapsed: time.Duration(s.ElapsedMs) * time.Millisecond, 984 PendingUnits: s.PendingUnits, 985 SlotMillis: s.TotalSlotMs, 986 }) 987 } 988 return res 989} 990 991// TransactionInfo contains information about a multi-statement transaction that may have associated with a job. 992type TransactionInfo struct { 993 // TransactionID is the system-generated identifier for the transaction. 994 TransactionID string 995} 996 997func bqToTransactionInfo(in *bq.TransactionInfo) *TransactionInfo { 998 if in == nil { 999 return nil 1000 } 1001 return &TransactionInfo{ 1002 TransactionID: in.TransactionId, 1003 } 1004} 1005