1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"fmt"
20	"io"
21	"net/http"
22	"net/url"
23	"strings"
24	"time"
25
26	"cloud.google.com/go/internal"
27	"cloud.google.com/go/internal/detect"
28	"cloud.google.com/go/internal/version"
29	gax "github.com/googleapis/gax-go/v2"
30	bq "google.golang.org/api/bigquery/v2"
31	"google.golang.org/api/googleapi"
32	"google.golang.org/api/option"
33)
34
35const (
36	// Scope is the Oauth2 scope for the service.
37	// For relevant BigQuery scopes, see:
38	// https://developers.google.com/identity/protocols/googlescopes#bigqueryv2
39	Scope           = "https://www.googleapis.com/auth/bigquery"
40	userAgentPrefix = "gcloud-golang-bigquery"
41)
42
43var xGoogHeader = fmt.Sprintf("gl-go/%s gccl/%s", version.Go(), version.Repo)
44
45func setClientHeader(headers http.Header) {
46	headers.Set("x-goog-api-client", xGoogHeader)
47}
48
49// Client may be used to perform BigQuery operations.
50type Client struct {
51	// Location, if set, will be used as the default location for all subsequent
52	// dataset creation and job operations. A location specified directly in one of
53	// those operations will override this value.
54	Location string
55
56	projectID string
57	bqs       *bq.Service
58}
59
60// DetectProjectID is a sentinel value that instructs NewClient to detect the
61// project ID. It is given in place of the projectID argument. NewClient will
62// use the project ID from the given credentials or the default credentials
63// (https://developers.google.com/accounts/docs/application-default-credentials)
64// if no credentials were provided. When providing credentials, not all
65// options will allow NewClient to extract the project ID. Specifically a JWT
66// does not have the project ID encoded.
67const DetectProjectID = "*detect-project-id*"
68
69// NewClient constructs a new Client which can perform BigQuery operations.
70// Operations performed via the client are billed to the specified GCP project.
71//
72// If the project ID is set to DetectProjectID, NewClient will attempt to detect
73// the project ID from credentials.
74func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
75	o := []option.ClientOption{
76		option.WithScopes(Scope),
77		option.WithUserAgent(fmt.Sprintf("%s/%s", userAgentPrefix, version.Repo)),
78	}
79	o = append(o, opts...)
80	bqs, err := bq.NewService(ctx, o...)
81	if err != nil {
82		return nil, fmt.Errorf("bigquery: constructing client: %v", err)
83	}
84
85	// Handle project autodetection.
86	projectID, err = detect.ProjectID(ctx, projectID, "", opts...)
87	if err != nil {
88		return nil, err
89	}
90
91	c := &Client{
92		projectID: projectID,
93		bqs:       bqs,
94	}
95	return c, nil
96}
97
98// Project returns the project ID or number for this instance of the client, which may have
99// either been explicitly specified or autodetected.
100func (c *Client) Project() string {
101	return c.projectID
102}
103
104// Close closes any resources held by the client.
105// Close should be called when the client is no longer needed.
106// It need not be called at program exit.
107func (c *Client) Close() error {
108	return nil
109}
110
111// Calls the Jobs.Insert RPC and returns a Job.
112func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader) (*Job, error) {
113	call := c.bqs.Jobs.Insert(c.projectID, job).Context(ctx)
114	setClientHeader(call.Header())
115	if media != nil {
116		call.Media(media)
117	}
118	var res *bq.Job
119	var err error
120	invoke := func() error {
121		res, err = call.Do()
122		return err
123	}
124	// A job with a client-generated ID can be retried; the presence of the
125	// ID makes the insert operation idempotent.
126	// We don't retry if there is media, because it is an io.Reader. We'd
127	// have to read the contents and keep it in memory, and that could be expensive.
128	// TODO(jba): Look into retrying if media != nil.
129	if job.JobReference != nil && media == nil {
130		err = runWithRetry(ctx, invoke)
131	} else {
132		err = invoke()
133	}
134	if err != nil {
135		return nil, err
136	}
137	return bqToJob(res, c)
138}
139
140// runQuery invokes the optimized query path.
141// Due to differences in options it supports, it cannot be used for all existing
142// jobs.insert requests that are query jobs.
143func (c *Client) runQuery(ctx context.Context, queryRequest *bq.QueryRequest) (*bq.QueryResponse, error) {
144	call := c.bqs.Jobs.Query(c.projectID, queryRequest)
145	setClientHeader(call.Header())
146
147	var res *bq.QueryResponse
148	var err error
149	invoke := func() error {
150		res, err = call.Do()
151		return err
152	}
153
154	// We control request ID, so we can always runWithRetry.
155	err = runWithRetry(ctx, invoke)
156	if err != nil {
157		return nil, err
158	}
159	return res, nil
160}
161
162// Convert a number of milliseconds since the Unix epoch to a time.Time.
163// Treat an input of zero specially: convert it to the zero time,
164// rather than the start of the epoch.
165func unixMillisToTime(m int64) time.Time {
166	if m == 0 {
167		return time.Time{}
168	}
169	return time.Unix(0, m*1e6)
170}
171
172// runWithRetry calls the function until it returns nil or a non-retryable error, or
173// the context is done.
174// See the similar function in ../storage/invoke.go. The main difference is the
175// reason for retrying.
176func runWithRetry(ctx context.Context, call func() error) error {
177	// These parameters match the suggestions in https://cloud.google.com/bigquery/sla.
178	backoff := gax.Backoff{
179		Initial:    1 * time.Second,
180		Max:        32 * time.Second,
181		Multiplier: 2,
182	}
183	return internal.Retry(ctx, backoff, func() (stop bool, err error) {
184		err = call()
185		if err == nil {
186			return true, nil
187		}
188		return !retryableError(err), err
189	})
190}
191
192// This is the correct definition of retryable according to the BigQuery team. It
193// also considers 502 ("Bad Gateway") and 503 ("Service Unavailable") errors
194// retryable; these are returned by systems between the client and the BigQuery
195// service.
196func retryableError(err error) bool {
197	if err == nil {
198		return false
199	}
200	if err == io.ErrUnexpectedEOF {
201		return true
202	}
203	// Special case due to http2: https://github.com/googleapis/google-cloud-go/issues/1793
204	// Due to Go's default being higher for streams-per-connection than is accepted by the
205	// BQ backend, it's possible to get streams refused immediately after a connection is
206	// started but before we receive SETTINGS frame from the backend.  This generally only
207	// happens when we try to enqueue > 100 requests onto a newly initiated connection.
208	if err.Error() == "http2: stream closed" {
209		return true
210	}
211
212	switch e := err.(type) {
213	case *googleapi.Error:
214		// We received a structured error from backend.
215		var reason string
216		if len(e.Errors) > 0 {
217			reason = e.Errors[0].Reason
218		}
219		if e.Code == http.StatusServiceUnavailable || e.Code == http.StatusBadGateway || reason == "backendError" || reason == "rateLimitExceeded" {
220			return true
221		}
222	case *url.Error:
223		retryable := []string{"connection refused", "connection reset"}
224		for _, s := range retryable {
225			if strings.Contains(e.Error(), s) {
226				return true
227			}
228		}
229	case interface{ Temporary() bool }:
230		if e.Temporary() {
231			return true
232		}
233	}
234	// Unwrap is only supported in go1.13.x+
235	if e, ok := err.(interface{ Unwrap() error }); ok {
236		return retryableError(e.Unwrap())
237	}
238	return false
239}
240