1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "fmt" 20 "io" 21 "net/http" 22 "time" 23 24 "cloud.google.com/go/internal" 25 "cloud.google.com/go/internal/version" 26 gax "github.com/googleapis/gax-go/v2" 27 bq "google.golang.org/api/bigquery/v2" 28 "google.golang.org/api/googleapi" 29 "google.golang.org/api/option" 30) 31 32const ( 33 // Scope is the Oauth2 scope for the service. 34 // For relevant BigQuery scopes, see: 35 // https://developers.google.com/identity/protocols/googlescopes#bigqueryv2 36 Scope = "https://www.googleapis.com/auth/bigquery" 37 userAgentPrefix = "gcloud-golang-bigquery" 38) 39 40var xGoogHeader = fmt.Sprintf("gl-go/%s gccl/%s", version.Go(), version.Repo) 41 42func setClientHeader(headers http.Header) { 43 headers.Set("x-goog-api-client", xGoogHeader) 44} 45 46// Client may be used to perform BigQuery operations. 47type Client struct { 48 // Location, if set, will be used as the default location for all subsequent 49 // dataset creation and job operations. A location specified directly in one of 50 // those operations will override this value. 51 Location string 52 53 projectID string 54 bqs *bq.Service 55} 56 57// NewClient constructs a new Client which can perform BigQuery operations. 58// Operations performed via the client are billed to the specified GCP project. 59func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) { 60 o := []option.ClientOption{ 61 option.WithScopes(Scope), 62 option.WithUserAgent(fmt.Sprintf("%s/%s", userAgentPrefix, version.Repo)), 63 } 64 o = append(o, opts...) 65 bqs, err := bq.NewService(ctx, o...) 66 if err != nil { 67 return nil, fmt.Errorf("bigquery: constructing client: %v", err) 68 } 69 c := &Client{ 70 projectID: projectID, 71 bqs: bqs, 72 } 73 return c, nil 74} 75 76// Close closes any resources held by the client. 77// Close should be called when the client is no longer needed. 78// It need not be called at program exit. 79func (c *Client) Close() error { 80 return nil 81} 82 83// Calls the Jobs.Insert RPC and returns a Job. 84func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader) (*Job, error) { 85 call := c.bqs.Jobs.Insert(c.projectID, job).Context(ctx) 86 setClientHeader(call.Header()) 87 if media != nil { 88 call.Media(media) 89 } 90 var res *bq.Job 91 var err error 92 invoke := func() error { 93 res, err = call.Do() 94 return err 95 } 96 // A job with a client-generated ID can be retried; the presence of the 97 // ID makes the insert operation idempotent. 98 // We don't retry if there is media, because it is an io.Reader. We'd 99 // have to read the contents and keep it in memory, and that could be expensive. 100 // TODO(jba): Look into retrying if media != nil. 101 if job.JobReference != nil && media == nil { 102 err = runWithRetry(ctx, invoke) 103 } else { 104 err = invoke() 105 } 106 if err != nil { 107 return nil, err 108 } 109 return bqToJob(res, c) 110} 111 112// Convert a number of milliseconds since the Unix epoch to a time.Time. 113// Treat an input of zero specially: convert it to the zero time, 114// rather than the start of the epoch. 115func unixMillisToTime(m int64) time.Time { 116 if m == 0 { 117 return time.Time{} 118 } 119 return time.Unix(0, m*1e6) 120} 121 122// runWithRetry calls the function until it returns nil or a non-retryable error, or 123// the context is done. 124// See the similar function in ../storage/invoke.go. The main difference is the 125// reason for retrying. 126func runWithRetry(ctx context.Context, call func() error) error { 127 // These parameters match the suggestions in https://cloud.google.com/bigquery/sla. 128 backoff := gax.Backoff{ 129 Initial: 1 * time.Second, 130 Max: 32 * time.Second, 131 Multiplier: 2, 132 } 133 return internal.Retry(ctx, backoff, func() (stop bool, err error) { 134 err = call() 135 if err == nil { 136 return true, nil 137 } 138 return !retryableError(err), err 139 }) 140} 141 142// This is the correct definition of retryable according to the BigQuery team. It 143// also considers 502 ("Bad Gateway") and 503 ("Service Unavailable") errors 144// retryable; these are returned by systems between the client and the BigQuery 145// service. 146func retryableError(err error) bool { 147 e, ok := err.(*googleapi.Error) 148 if !ok { 149 return false 150 } 151 var reason string 152 if len(e.Errors) > 0 { 153 reason = e.Errors[0].Reason 154 } 155 return e.Code == http.StatusServiceUnavailable || e.Code == http.StatusBadGateway || reason == "backendError" || reason == "rateLimitExceeded" 156} 157