1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"errors"
20	"fmt"
21
22	"cloud.google.com/go/internal/trace"
23	"cloud.google.com/go/internal/uid"
24	bq "google.golang.org/api/bigquery/v2"
25)
26
27// QueryConfig holds the configuration for a query job.
28type QueryConfig struct {
29	// Dst is the table into which the results of the query will be written.
30	// If this field is nil, a temporary table will be created.
31	Dst *Table
32
33	// The query to execute. See https://cloud.google.com/bigquery/query-reference for details.
34	Q string
35
36	// DefaultProjectID and DefaultDatasetID specify the dataset to use for unqualified table names in the query.
37	// If DefaultProjectID is set, DefaultDatasetID must also be set.
38	DefaultProjectID string
39	DefaultDatasetID string
40
41	// TableDefinitions describes data sources outside of BigQuery.
42	// The map keys may be used as table names in the query string.
43	//
44	// When a QueryConfig is returned from Job.Config, the map values
45	// are always of type *ExternalDataConfig.
46	TableDefinitions map[string]ExternalData
47
48	// CreateDisposition specifies the circumstances under which the destination table will be created.
49	// The default is CreateIfNeeded.
50	CreateDisposition TableCreateDisposition
51
52	// WriteDisposition specifies how existing data in the destination table is treated.
53	// The default is WriteEmpty.
54	WriteDisposition TableWriteDisposition
55
56	// DisableQueryCache prevents results being fetched from the query cache.
57	// If this field is false, results are fetched from the cache if they are available.
58	// The query cache is a best-effort cache that is flushed whenever tables in the query are modified.
59	// Cached results are only available when TableID is unspecified in the query's destination Table.
60	// For more information, see https://cloud.google.com/bigquery/querying-data#querycaching
61	DisableQueryCache bool
62
63	// DisableFlattenedResults prevents results being flattened.
64	// If this field is false, results from nested and repeated fields are flattened.
65	// DisableFlattenedResults implies AllowLargeResults
66	// For more information, see https://cloud.google.com/bigquery/docs/data#nested
67	DisableFlattenedResults bool
68
69	// AllowLargeResults allows the query to produce arbitrarily large result tables.
70	// The destination must be a table.
71	// When using this option, queries will take longer to execute, even if the result set is small.
72	// For additional limitations, see https://cloud.google.com/bigquery/querying-data#largequeryresults
73	AllowLargeResults bool
74
75	// Priority specifies the priority with which to schedule the query.
76	// The default priority is InteractivePriority.
77	// For more information, see https://cloud.google.com/bigquery/querying-data#batchqueries
78	Priority QueryPriority
79
80	// MaxBillingTier sets the maximum billing tier for a Query.
81	// Queries that have resource usage beyond this tier will fail (without
82	// incurring a charge). If this field is zero, the project default will be used.
83	MaxBillingTier int
84
85	// MaxBytesBilled limits the number of bytes billed for
86	// this job.  Queries that would exceed this limit will fail (without incurring
87	// a charge).
88	// If this field is less than 1, the project default will be
89	// used.
90	MaxBytesBilled int64
91
92	// UseStandardSQL causes the query to use standard SQL. The default.
93	// Deprecated: use UseLegacySQL.
94	UseStandardSQL bool
95
96	// UseLegacySQL causes the query to use legacy SQL.
97	UseLegacySQL bool
98
99	// Parameters is a list of query parameters. The presence of parameters
100	// implies the use of standard SQL.
101	// If the query uses positional syntax ("?"), then no parameter may have a name.
102	// If the query uses named syntax ("@p"), then all parameters must have names.
103	// It is illegal to mix positional and named syntax.
104	Parameters []QueryParameter
105
106	// TimePartitioning specifies time-based partitioning
107	// for the destination table.
108	TimePartitioning *TimePartitioning
109
110	// RangePartitioning specifies integer range-based partitioning
111	// for the destination table.
112	RangePartitioning *RangePartitioning
113
114	// Clustering specifies the data clustering configuration for the destination table.
115	Clustering *Clustering
116
117	// The labels associated with this job.
118	Labels map[string]string
119
120	// If true, don't actually run this job. A valid query will return a mostly
121	// empty response with some processing statistics, while an invalid query will
122	// return the same error it would if it wasn't a dry run.
123	//
124	// Query.Read will fail with dry-run queries. Call Query.Run instead, and then
125	// call LastStatus on the returned job to get statistics. Calling Status on a
126	// dry-run job will fail.
127	DryRun bool
128
129	// Custom encryption configuration (e.g., Cloud KMS keys).
130	DestinationEncryptionConfig *EncryptionConfig
131
132	// Allows the schema of the destination table to be updated as a side effect of
133	// the query job.
134	SchemaUpdateOptions []string
135}
136
137func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
138	qconf := &bq.JobConfigurationQuery{
139		Query:                              qc.Q,
140		CreateDisposition:                  string(qc.CreateDisposition),
141		WriteDisposition:                   string(qc.WriteDisposition),
142		AllowLargeResults:                  qc.AllowLargeResults,
143		Priority:                           string(qc.Priority),
144		MaximumBytesBilled:                 qc.MaxBytesBilled,
145		TimePartitioning:                   qc.TimePartitioning.toBQ(),
146		RangePartitioning:                  qc.RangePartitioning.toBQ(),
147		Clustering:                         qc.Clustering.toBQ(),
148		DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(),
149		SchemaUpdateOptions:                qc.SchemaUpdateOptions,
150	}
151	if len(qc.TableDefinitions) > 0 {
152		qconf.TableDefinitions = make(map[string]bq.ExternalDataConfiguration)
153	}
154	for name, data := range qc.TableDefinitions {
155		qconf.TableDefinitions[name] = data.toBQ()
156	}
157	if qc.DefaultProjectID != "" || qc.DefaultDatasetID != "" {
158		qconf.DefaultDataset = &bq.DatasetReference{
159			DatasetId: qc.DefaultDatasetID,
160			ProjectId: qc.DefaultProjectID,
161		}
162	}
163	if tier := int64(qc.MaxBillingTier); tier > 0 {
164		qconf.MaximumBillingTier = &tier
165	}
166	f := false
167	if qc.DisableQueryCache {
168		qconf.UseQueryCache = &f
169	}
170	if qc.DisableFlattenedResults {
171		qconf.FlattenResults = &f
172		// DisableFlattenResults implies AllowLargeResults.
173		qconf.AllowLargeResults = true
174	}
175	if qc.UseStandardSQL && qc.UseLegacySQL {
176		return nil, errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL")
177	}
178	if len(qc.Parameters) > 0 && qc.UseLegacySQL {
179		return nil, errors.New("bigquery: cannot provide both Parameters (implying standard SQL) and UseLegacySQL")
180	}
181	ptrue := true
182	pfalse := false
183	if qc.UseLegacySQL {
184		qconf.UseLegacySql = &ptrue
185	} else {
186		qconf.UseLegacySql = &pfalse
187	}
188	if qc.Dst != nil && !qc.Dst.implicitTable() {
189		qconf.DestinationTable = qc.Dst.toBQ()
190	}
191	for _, p := range qc.Parameters {
192		qp, err := p.toBQ()
193		if err != nil {
194			return nil, err
195		}
196		qconf.QueryParameters = append(qconf.QueryParameters, qp)
197	}
198	return &bq.JobConfiguration{
199		Labels: qc.Labels,
200		DryRun: qc.DryRun,
201		Query:  qconf,
202	}, nil
203}
204
205func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) {
206	qq := q.Query
207	qc := &QueryConfig{
208		Labels:                      q.Labels,
209		DryRun:                      q.DryRun,
210		Q:                           qq.Query,
211		CreateDisposition:           TableCreateDisposition(qq.CreateDisposition),
212		WriteDisposition:            TableWriteDisposition(qq.WriteDisposition),
213		AllowLargeResults:           qq.AllowLargeResults,
214		Priority:                    QueryPriority(qq.Priority),
215		MaxBytesBilled:              qq.MaximumBytesBilled,
216		UseLegacySQL:                qq.UseLegacySql == nil || *qq.UseLegacySql,
217		TimePartitioning:            bqToTimePartitioning(qq.TimePartitioning),
218		RangePartitioning:           bqToRangePartitioning(qq.RangePartitioning),
219		Clustering:                  bqToClustering(qq.Clustering),
220		DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration),
221		SchemaUpdateOptions:         qq.SchemaUpdateOptions,
222	}
223	qc.UseStandardSQL = !qc.UseLegacySQL
224
225	if len(qq.TableDefinitions) > 0 {
226		qc.TableDefinitions = make(map[string]ExternalData)
227	}
228	for name, qedc := range qq.TableDefinitions {
229		edc, err := bqToExternalDataConfig(&qedc)
230		if err != nil {
231			return nil, err
232		}
233		qc.TableDefinitions[name] = edc
234	}
235	if qq.DefaultDataset != nil {
236		qc.DefaultProjectID = qq.DefaultDataset.ProjectId
237		qc.DefaultDatasetID = qq.DefaultDataset.DatasetId
238	}
239	if qq.MaximumBillingTier != nil {
240		qc.MaxBillingTier = int(*qq.MaximumBillingTier)
241	}
242	if qq.UseQueryCache != nil && !*qq.UseQueryCache {
243		qc.DisableQueryCache = true
244	}
245	if qq.FlattenResults != nil && !*qq.FlattenResults {
246		qc.DisableFlattenedResults = true
247	}
248	if qq.DestinationTable != nil {
249		qc.Dst = bqToTable(qq.DestinationTable, c)
250	}
251	for _, qp := range qq.QueryParameters {
252		p, err := bqToQueryParameter(qp)
253		if err != nil {
254			return nil, err
255		}
256		qc.Parameters = append(qc.Parameters, p)
257	}
258	return qc, nil
259}
260
261// QueryPriority specifies a priority with which a query is to be executed.
262type QueryPriority string
263
264const (
265	// BatchPriority specifies that the query should be scheduled with the
266	// batch priority.  BigQuery queues each batch query on your behalf, and
267	// starts the query as soon as idle resources are available, usually within
268	// a few minutes. If BigQuery hasn't started the query within 24 hours,
269	// BigQuery changes the job priority to interactive. Batch queries don't
270	// count towards your concurrent rate limit, which can make it easier to
271	// start many queries at once.
272	//
273	// More information can be found at https://cloud.google.com/bigquery/docs/running-queries#batchqueries.
274	BatchPriority QueryPriority = "BATCH"
275	// InteractivePriority specifies that the query should be scheduled with
276	// interactive priority, which means that the query is executed as soon as
277	// possible. Interactive queries count towards your concurrent rate limit
278	// and your daily limit. It is the default priority with which queries get
279	// executed.
280	//
281	// More information can be found at https://cloud.google.com/bigquery/docs/running-queries#queries.
282	InteractivePriority QueryPriority = "INTERACTIVE"
283)
284
285// A Query queries data from a BigQuery table. Use Client.Query to create a Query.
286type Query struct {
287	JobIDConfig
288	QueryConfig
289	client *Client
290}
291
292// Query creates a query with string q.
293// The returned Query may optionally be further configured before its Run method is called.
294func (c *Client) Query(q string) *Query {
295	return &Query{
296		client:      c,
297		QueryConfig: QueryConfig{Q: q},
298	}
299}
300
301// Run initiates a query job.
302func (q *Query) Run(ctx context.Context) (j *Job, err error) {
303	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run")
304	defer func() { trace.EndSpan(ctx, err) }()
305
306	job, err := q.newJob()
307	if err != nil {
308		return nil, err
309	}
310	j, err = q.client.insertJob(ctx, job, nil)
311	if err != nil {
312		return nil, err
313	}
314	return j, nil
315}
316
317func (q *Query) newJob() (*bq.Job, error) {
318	config, err := q.QueryConfig.toBQ()
319	if err != nil {
320		return nil, err
321	}
322	return &bq.Job{
323		JobReference:  q.JobIDConfig.createJobRef(q.client),
324		Configuration: config,
325	}, nil
326}
327
328// Read submits a query for execution and returns the results via a RowIterator.
329// If the request can be satisfied by running using the optimized query path, it
330// is used in place of the jobs.insert path as this path does not expose a job
331// object.
332func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) {
333	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run")
334	defer func() { trace.EndSpan(ctx, err) }()
335	queryRequest, err := q.probeFastPath()
336	if err != nil {
337		// Any error means we fallback to the older mechanism.
338		job, err := q.Run(ctx)
339		if err != nil {
340			return nil, err
341		}
342		return job.Read(ctx)
343	}
344	// we have a config, run on fastPath.
345	resp, err := q.client.runQuery(ctx, queryRequest)
346	if err != nil {
347		return nil, err
348	}
349	// construct a minimal job for backing the row iterator.
350	minimalJob := &Job{
351		c:         q.client,
352		jobID:     resp.JobReference.JobId,
353		location:  resp.JobReference.Location,
354		projectID: resp.JobReference.ProjectId,
355	}
356	if resp.JobComplete {
357		rowSource := &rowSource{
358			j: minimalJob,
359			// RowIterator can precache results from the iterator to save a lookup.
360			cachedRows:      resp.Rows,
361			cachedSchema:    resp.Schema,
362			cachedNextToken: resp.PageToken,
363		}
364		return newRowIterator(ctx, rowSource, fetchPage), nil
365	}
366	// We're on the fastPath, but we need to poll because the job is incomplete.
367	// Fallback to job-based Read().
368	//
369	// (Issue 2937) In order to satisfy basic probing of the job in classic path,
370	// we need to supply additional config which is probed for presence, not contents.
371	//
372	minimalJob.config = &bq.JobConfiguration{
373		Query: &bq.JobConfigurationQuery{},
374	}
375
376	return minimalJob.Read(ctx)
377}
378
379// probeFastPath is used to attempt configuring a jobs.Query request based on a
380// user's Query configuration.  If all the options set on the job are supported on the
381// faster query path, this method returns a QueryRequest suitable for execution.
382func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
383	// This is a denylist of settings which prevent us from composing an equivalent
384	// bq.QueryRequest due to differences between configuration parameters accepted
385	// by jobs.insert vs jobs.query.
386	if q.QueryConfig.Dst != nil ||
387		q.QueryConfig.TableDefinitions != nil ||
388		q.QueryConfig.CreateDisposition != "" ||
389		q.QueryConfig.WriteDisposition != "" ||
390		!(q.QueryConfig.Priority == "" || q.QueryConfig.Priority == InteractivePriority) ||
391		q.QueryConfig.UseLegacySQL ||
392		q.QueryConfig.MaxBillingTier != 0 ||
393		q.QueryConfig.TimePartitioning != nil ||
394		q.QueryConfig.RangePartitioning != nil ||
395		q.QueryConfig.Clustering != nil ||
396		q.QueryConfig.DestinationEncryptionConfig != nil ||
397		q.QueryConfig.SchemaUpdateOptions != nil ||
398		// User has defined the jobID generation behavior
399		q.JobIDConfig.JobID != "" {
400		return nil, fmt.Errorf("QueryConfig incompatible with fastPath")
401	}
402	pfalse := false
403	qRequest := &bq.QueryRequest{
404		Query:              q.QueryConfig.Q,
405		Location:           q.Location,
406		UseLegacySql:       &pfalse,
407		MaximumBytesBilled: q.QueryConfig.MaxBytesBilled,
408		RequestId:          uid.NewSpace("request", nil).New(),
409		Labels:             q.Labels,
410	}
411	if q.QueryConfig.DisableQueryCache {
412		qRequest.UseQueryCache = &pfalse
413	}
414	// Convert query parameters
415	for _, p := range q.QueryConfig.Parameters {
416		qp, err := p.toBQ()
417		if err != nil {
418			return nil, err
419		}
420		qRequest.QueryParameters = append(qRequest.QueryParameters, qp)
421	}
422	if q.QueryConfig.DefaultDatasetID != "" {
423		qRequest.DefaultDataset = &bq.DatasetReference{
424			ProjectId: q.QueryConfig.DefaultProjectID,
425			DatasetId: q.QueryConfig.DefaultDatasetID,
426		}
427	}
428	return qRequest, nil
429}
430