1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"fmt"
20	"io"
21	"net/http"
22	"time"
23
24	"cloud.google.com/go/internal"
25	"cloud.google.com/go/internal/version"
26	gax "github.com/googleapis/gax-go/v2"
27	bq "google.golang.org/api/bigquery/v2"
28	"google.golang.org/api/googleapi"
29	"google.golang.org/api/option"
30)
31
32const (
33	// Scope is the Oauth2 scope for the service.
34	// For relevant BigQuery scopes, see:
35	// https://developers.google.com/identity/protocols/googlescopes#bigqueryv2
36	Scope           = "https://www.googleapis.com/auth/bigquery"
37	userAgentPrefix = "gcloud-golang-bigquery"
38)
39
40var xGoogHeader = fmt.Sprintf("gl-go/%s gccl/%s", version.Go(), version.Repo)
41
42func setClientHeader(headers http.Header) {
43	headers.Set("x-goog-api-client", xGoogHeader)
44}
45
46// Client may be used to perform BigQuery operations.
47type Client struct {
48	// Location, if set, will be used as the default location for all subsequent
49	// dataset creation and job operations. A location specified directly in one of
50	// those operations will override this value.
51	Location string
52
53	projectID string
54	bqs       *bq.Service
55}
56
57// NewClient constructs a new Client which can perform BigQuery operations.
58// Operations performed via the client are billed to the specified GCP project.
59func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
60	o := []option.ClientOption{
61		option.WithScopes(Scope),
62		option.WithUserAgent(fmt.Sprintf("%s/%s", userAgentPrefix, version.Repo)),
63	}
64	o = append(o, opts...)
65	bqs, err := bq.NewService(ctx, o...)
66	if err != nil {
67		return nil, fmt.Errorf("bigquery: constructing client: %v", err)
68	}
69	c := &Client{
70		projectID: projectID,
71		bqs:       bqs,
72	}
73	return c, nil
74}
75
76// Close closes any resources held by the client.
77// Close should be called when the client is no longer needed.
78// It need not be called at program exit.
79func (c *Client) Close() error {
80	return nil
81}
82
83// Calls the Jobs.Insert RPC and returns a Job.
84func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader) (*Job, error) {
85	call := c.bqs.Jobs.Insert(c.projectID, job).Context(ctx)
86	setClientHeader(call.Header())
87	if media != nil {
88		call.Media(media)
89	}
90	var res *bq.Job
91	var err error
92	invoke := func() error {
93		res, err = call.Do()
94		return err
95	}
96	// A job with a client-generated ID can be retried; the presence of the
97	// ID makes the insert operation idempotent.
98	// We don't retry if there is media, because it is an io.Reader. We'd
99	// have to read the contents and keep it in memory, and that could be expensive.
100	// TODO(jba): Look into retrying if media != nil.
101	if job.JobReference != nil && media == nil {
102		err = runWithRetry(ctx, invoke)
103	} else {
104		err = invoke()
105	}
106	if err != nil {
107		return nil, err
108	}
109	return bqToJob(res, c)
110}
111
112// Convert a number of milliseconds since the Unix epoch to a time.Time.
113// Treat an input of zero specially: convert it to the zero time,
114// rather than the start of the epoch.
115func unixMillisToTime(m int64) time.Time {
116	if m == 0 {
117		return time.Time{}
118	}
119	return time.Unix(0, m*1e6)
120}
121
122// runWithRetry calls the function until it returns nil or a non-retryable error, or
123// the context is done.
124// See the similar function in ../storage/invoke.go. The main difference is the
125// reason for retrying.
126func runWithRetry(ctx context.Context, call func() error) error {
127	// These parameters match the suggestions in https://cloud.google.com/bigquery/sla.
128	backoff := gax.Backoff{
129		Initial:    1 * time.Second,
130		Max:        32 * time.Second,
131		Multiplier: 2,
132	}
133	return internal.Retry(ctx, backoff, func() (stop bool, err error) {
134		err = call()
135		if err == nil {
136			return true, nil
137		}
138		return !retryableError(err), err
139	})
140}
141
142// This is the correct definition of retryable according to the BigQuery team. It
143// also considers 502 ("Bad Gateway") and 503 ("Service Unavailable") errors
144// retryable; these are returned by systems between the client and the BigQuery
145// service.
146func retryableError(err error) bool {
147	e, ok := err.(*googleapi.Error)
148	if !ok {
149		return false
150	}
151	var reason string
152	if len(e.Errors) > 0 {
153		reason = e.Errors[0].Reason
154	}
155	return e.Code == http.StatusServiceUnavailable || e.Code == http.StatusBadGateway || reason == "backendError" || reason == "rateLimitExceeded"
156}
157