1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "fmt" 20 "io" 21 "net/http" 22 "net/url" 23 "strings" 24 "time" 25 26 "cloud.google.com/go/internal" 27 "cloud.google.com/go/internal/detect" 28 "cloud.google.com/go/internal/version" 29 gax "github.com/googleapis/gax-go/v2" 30 bq "google.golang.org/api/bigquery/v2" 31 "google.golang.org/api/googleapi" 32 "google.golang.org/api/option" 33) 34 35const ( 36 // Scope is the Oauth2 scope for the service. 37 // For relevant BigQuery scopes, see: 38 // https://developers.google.com/identity/protocols/googlescopes#bigqueryv2 39 Scope = "https://www.googleapis.com/auth/bigquery" 40 userAgentPrefix = "gcloud-golang-bigquery" 41) 42 43var xGoogHeader = fmt.Sprintf("gl-go/%s gccl/%s", version.Go(), version.Repo) 44 45func setClientHeader(headers http.Header) { 46 headers.Set("x-goog-api-client", xGoogHeader) 47} 48 49// Client may be used to perform BigQuery operations. 50type Client struct { 51 // Location, if set, will be used as the default location for all subsequent 52 // dataset creation and job operations. A location specified directly in one of 53 // those operations will override this value. 54 Location string 55 56 projectID string 57 bqs *bq.Service 58} 59 60// DetectProjectID is a sentinel value that instructs NewClient to detect the 61// project ID. It is given in place of the projectID argument. NewClient will 62// use the project ID from the given credentials or the default credentials 63// (https://developers.google.com/accounts/docs/application-default-credentials) 64// if no credentials were provided. When providing credentials, not all 65// options will allow NewClient to extract the project ID. Specifically a JWT 66// does not have the project ID encoded. 67const DetectProjectID = "*detect-project-id*" 68 69// NewClient constructs a new Client which can perform BigQuery operations. 70// Operations performed via the client are billed to the specified GCP project. 71// 72// If the project ID is set to DetectProjectID, NewClient will attempt to detect 73// the project ID from credentials. 74func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) { 75 o := []option.ClientOption{ 76 option.WithScopes(Scope), 77 option.WithUserAgent(fmt.Sprintf("%s/%s", userAgentPrefix, version.Repo)), 78 } 79 o = append(o, opts...) 80 bqs, err := bq.NewService(ctx, o...) 81 if err != nil { 82 return nil, fmt.Errorf("bigquery: constructing client: %v", err) 83 } 84 85 // Handle project autodetection. 86 projectID, err = detect.ProjectID(ctx, projectID, "", opts...) 87 if err != nil { 88 return nil, err 89 } 90 91 c := &Client{ 92 projectID: projectID, 93 bqs: bqs, 94 } 95 return c, nil 96} 97 98// Project returns the project ID or number for this instance of the client, which may have 99// either been explicitly specified or autodetected. 100func (c *Client) Project() string { 101 return c.projectID 102} 103 104// Close closes any resources held by the client. 105// Close should be called when the client is no longer needed. 106// It need not be called at program exit. 107func (c *Client) Close() error { 108 return nil 109} 110 111// Calls the Jobs.Insert RPC and returns a Job. 112func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader) (*Job, error) { 113 call := c.bqs.Jobs.Insert(c.projectID, job).Context(ctx) 114 setClientHeader(call.Header()) 115 if media != nil { 116 call.Media(media) 117 } 118 var res *bq.Job 119 var err error 120 invoke := func() error { 121 res, err = call.Do() 122 return err 123 } 124 // A job with a client-generated ID can be retried; the presence of the 125 // ID makes the insert operation idempotent. 126 // We don't retry if there is media, because it is an io.Reader. We'd 127 // have to read the contents and keep it in memory, and that could be expensive. 128 // TODO(jba): Look into retrying if media != nil. 129 if job.JobReference != nil && media == nil { 130 err = runWithRetry(ctx, invoke) 131 } else { 132 err = invoke() 133 } 134 if err != nil { 135 return nil, err 136 } 137 return bqToJob(res, c) 138} 139 140// runQuery invokes the optimized query path. 141// Due to differences in options it supports, it cannot be used for all existing 142// jobs.insert requests that are query jobs. 143func (c *Client) runQuery(ctx context.Context, queryRequest *bq.QueryRequest) (*bq.QueryResponse, error) { 144 call := c.bqs.Jobs.Query(c.projectID, queryRequest) 145 setClientHeader(call.Header()) 146 147 var res *bq.QueryResponse 148 var err error 149 invoke := func() error { 150 res, err = call.Do() 151 return err 152 } 153 154 // We control request ID, so we can always runWithRetry. 155 err = runWithRetry(ctx, invoke) 156 if err != nil { 157 return nil, err 158 } 159 return res, nil 160} 161 162// Convert a number of milliseconds since the Unix epoch to a time.Time. 163// Treat an input of zero specially: convert it to the zero time, 164// rather than the start of the epoch. 165func unixMillisToTime(m int64) time.Time { 166 if m == 0 { 167 return time.Time{} 168 } 169 return time.Unix(0, m*1e6) 170} 171 172// runWithRetry calls the function until it returns nil or a non-retryable error, or 173// the context is done. 174// See the similar function in ../storage/invoke.go. The main difference is the 175// reason for retrying. 176func runWithRetry(ctx context.Context, call func() error) error { 177 // These parameters match the suggestions in https://cloud.google.com/bigquery/sla. 178 backoff := gax.Backoff{ 179 Initial: 1 * time.Second, 180 Max: 32 * time.Second, 181 Multiplier: 2, 182 } 183 return internal.Retry(ctx, backoff, func() (stop bool, err error) { 184 err = call() 185 if err == nil { 186 return true, nil 187 } 188 return !retryableError(err), err 189 }) 190} 191 192// This is the correct definition of retryable according to the BigQuery team. It 193// also considers 502 ("Bad Gateway") and 503 ("Service Unavailable") errors 194// retryable; these are returned by systems between the client and the BigQuery 195// service. 196func retryableError(err error) bool { 197 if err == nil { 198 return false 199 } 200 if err == io.ErrUnexpectedEOF { 201 return true 202 } 203 // Special case due to http2: https://github.com/googleapis/google-cloud-go/issues/1793 204 // Due to Go's default being higher for streams-per-connection than is accepted by the 205 // BQ backend, it's possible to get streams refused immediately after a connection is 206 // started but before we receive SETTINGS frame from the backend. This generally only 207 // happens when we try to enqueue > 100 requests onto a newly initiated connection. 208 if err.Error() == "http2: stream closed" { 209 return true 210 } 211 212 switch e := err.(type) { 213 case *googleapi.Error: 214 // We received a structured error from backend. 215 var reason string 216 if len(e.Errors) > 0 { 217 reason = e.Errors[0].Reason 218 } 219 if e.Code == http.StatusServiceUnavailable || e.Code == http.StatusBadGateway || reason == "backendError" || reason == "rateLimitExceeded" { 220 return true 221 } 222 case *url.Error: 223 retryable := []string{"connection refused", "connection reset"} 224 for _, s := range retryable { 225 if strings.Contains(e.Error(), s) { 226 return true 227 } 228 } 229 case interface{ Temporary() bool }: 230 if e.Temporary() { 231 return true 232 } 233 } 234 // Unwrap is only supported in go1.13.x+ 235 if e, ok := err.(interface{ Unwrap() error }); ok { 236 return retryableError(e.Unwrap()) 237 } 238 return false 239} 240