1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 22 "cloud.google.com/go/internal/trace" 23 "cloud.google.com/go/internal/uid" 24 bq "google.golang.org/api/bigquery/v2" 25) 26 27// QueryConfig holds the configuration for a query job. 28type QueryConfig struct { 29 // Dst is the table into which the results of the query will be written. 30 // If this field is nil, a temporary table will be created. 31 Dst *Table 32 33 // The query to execute. See https://cloud.google.com/bigquery/query-reference for details. 34 Q string 35 36 // DefaultProjectID and DefaultDatasetID specify the dataset to use for unqualified table names in the query. 37 // If DefaultProjectID is set, DefaultDatasetID must also be set. 38 DefaultProjectID string 39 DefaultDatasetID string 40 41 // TableDefinitions describes data sources outside of BigQuery. 42 // The map keys may be used as table names in the query string. 43 // 44 // When a QueryConfig is returned from Job.Config, the map values 45 // are always of type *ExternalDataConfig. 46 TableDefinitions map[string]ExternalData 47 48 // CreateDisposition specifies the circumstances under which the destination table will be created. 49 // The default is CreateIfNeeded. 50 CreateDisposition TableCreateDisposition 51 52 // WriteDisposition specifies how existing data in the destination table is treated. 53 // The default is WriteEmpty. 54 WriteDisposition TableWriteDisposition 55 56 // DisableQueryCache prevents results being fetched from the query cache. 57 // If this field is false, results are fetched from the cache if they are available. 58 // The query cache is a best-effort cache that is flushed whenever tables in the query are modified. 59 // Cached results are only available when TableID is unspecified in the query's destination Table. 60 // For more information, see https://cloud.google.com/bigquery/querying-data#querycaching 61 DisableQueryCache bool 62 63 // DisableFlattenedResults prevents results being flattened. 64 // If this field is false, results from nested and repeated fields are flattened. 65 // DisableFlattenedResults implies AllowLargeResults 66 // For more information, see https://cloud.google.com/bigquery/docs/data#nested 67 DisableFlattenedResults bool 68 69 // AllowLargeResults allows the query to produce arbitrarily large result tables. 70 // The destination must be a table. 71 // When using this option, queries will take longer to execute, even if the result set is small. 72 // For additional limitations, see https://cloud.google.com/bigquery/querying-data#largequeryresults 73 AllowLargeResults bool 74 75 // Priority specifies the priority with which to schedule the query. 76 // The default priority is InteractivePriority. 77 // For more information, see https://cloud.google.com/bigquery/querying-data#batchqueries 78 Priority QueryPriority 79 80 // MaxBillingTier sets the maximum billing tier for a Query. 81 // Queries that have resource usage beyond this tier will fail (without 82 // incurring a charge). If this field is zero, the project default will be used. 83 MaxBillingTier int 84 85 // MaxBytesBilled limits the number of bytes billed for 86 // this job. Queries that would exceed this limit will fail (without incurring 87 // a charge). 88 // If this field is less than 1, the project default will be 89 // used. 90 MaxBytesBilled int64 91 92 // UseStandardSQL causes the query to use standard SQL. The default. 93 // Deprecated: use UseLegacySQL. 94 UseStandardSQL bool 95 96 // UseLegacySQL causes the query to use legacy SQL. 97 UseLegacySQL bool 98 99 // Parameters is a list of query parameters. The presence of parameters 100 // implies the use of standard SQL. 101 // If the query uses positional syntax ("?"), then no parameter may have a name. 102 // If the query uses named syntax ("@p"), then all parameters must have names. 103 // It is illegal to mix positional and named syntax. 104 Parameters []QueryParameter 105 106 // TimePartitioning specifies time-based partitioning 107 // for the destination table. 108 TimePartitioning *TimePartitioning 109 110 // RangePartitioning specifies integer range-based partitioning 111 // for the destination table. 112 RangePartitioning *RangePartitioning 113 114 // Clustering specifies the data clustering configuration for the destination table. 115 Clustering *Clustering 116 117 // The labels associated with this job. 118 Labels map[string]string 119 120 // If true, don't actually run this job. A valid query will return a mostly 121 // empty response with some processing statistics, while an invalid query will 122 // return the same error it would if it wasn't a dry run. 123 // 124 // Query.Read will fail with dry-run queries. Call Query.Run instead, and then 125 // call LastStatus on the returned job to get statistics. Calling Status on a 126 // dry-run job will fail. 127 DryRun bool 128 129 // Custom encryption configuration (e.g., Cloud KMS keys). 130 DestinationEncryptionConfig *EncryptionConfig 131 132 // Allows the schema of the destination table to be updated as a side effect of 133 // the query job. 134 SchemaUpdateOptions []string 135} 136 137func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) { 138 qconf := &bq.JobConfigurationQuery{ 139 Query: qc.Q, 140 CreateDisposition: string(qc.CreateDisposition), 141 WriteDisposition: string(qc.WriteDisposition), 142 AllowLargeResults: qc.AllowLargeResults, 143 Priority: string(qc.Priority), 144 MaximumBytesBilled: qc.MaxBytesBilled, 145 TimePartitioning: qc.TimePartitioning.toBQ(), 146 RangePartitioning: qc.RangePartitioning.toBQ(), 147 Clustering: qc.Clustering.toBQ(), 148 DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(), 149 SchemaUpdateOptions: qc.SchemaUpdateOptions, 150 } 151 if len(qc.TableDefinitions) > 0 { 152 qconf.TableDefinitions = make(map[string]bq.ExternalDataConfiguration) 153 } 154 for name, data := range qc.TableDefinitions { 155 qconf.TableDefinitions[name] = data.toBQ() 156 } 157 if qc.DefaultProjectID != "" || qc.DefaultDatasetID != "" { 158 qconf.DefaultDataset = &bq.DatasetReference{ 159 DatasetId: qc.DefaultDatasetID, 160 ProjectId: qc.DefaultProjectID, 161 } 162 } 163 if tier := int64(qc.MaxBillingTier); tier > 0 { 164 qconf.MaximumBillingTier = &tier 165 } 166 f := false 167 if qc.DisableQueryCache { 168 qconf.UseQueryCache = &f 169 } 170 if qc.DisableFlattenedResults { 171 qconf.FlattenResults = &f 172 // DisableFlattenResults implies AllowLargeResults. 173 qconf.AllowLargeResults = true 174 } 175 if qc.UseStandardSQL && qc.UseLegacySQL { 176 return nil, errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL") 177 } 178 if len(qc.Parameters) > 0 && qc.UseLegacySQL { 179 return nil, errors.New("bigquery: cannot provide both Parameters (implying standard SQL) and UseLegacySQL") 180 } 181 ptrue := true 182 pfalse := false 183 if qc.UseLegacySQL { 184 qconf.UseLegacySql = &ptrue 185 } else { 186 qconf.UseLegacySql = &pfalse 187 } 188 if qc.Dst != nil && !qc.Dst.implicitTable() { 189 qconf.DestinationTable = qc.Dst.toBQ() 190 } 191 for _, p := range qc.Parameters { 192 qp, err := p.toBQ() 193 if err != nil { 194 return nil, err 195 } 196 qconf.QueryParameters = append(qconf.QueryParameters, qp) 197 } 198 return &bq.JobConfiguration{ 199 Labels: qc.Labels, 200 DryRun: qc.DryRun, 201 Query: qconf, 202 }, nil 203} 204 205func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) { 206 qq := q.Query 207 qc := &QueryConfig{ 208 Labels: q.Labels, 209 DryRun: q.DryRun, 210 Q: qq.Query, 211 CreateDisposition: TableCreateDisposition(qq.CreateDisposition), 212 WriteDisposition: TableWriteDisposition(qq.WriteDisposition), 213 AllowLargeResults: qq.AllowLargeResults, 214 Priority: QueryPriority(qq.Priority), 215 MaxBytesBilled: qq.MaximumBytesBilled, 216 UseLegacySQL: qq.UseLegacySql == nil || *qq.UseLegacySql, 217 TimePartitioning: bqToTimePartitioning(qq.TimePartitioning), 218 RangePartitioning: bqToRangePartitioning(qq.RangePartitioning), 219 Clustering: bqToClustering(qq.Clustering), 220 DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration), 221 SchemaUpdateOptions: qq.SchemaUpdateOptions, 222 } 223 qc.UseStandardSQL = !qc.UseLegacySQL 224 225 if len(qq.TableDefinitions) > 0 { 226 qc.TableDefinitions = make(map[string]ExternalData) 227 } 228 for name, qedc := range qq.TableDefinitions { 229 edc, err := bqToExternalDataConfig(&qedc) 230 if err != nil { 231 return nil, err 232 } 233 qc.TableDefinitions[name] = edc 234 } 235 if qq.DefaultDataset != nil { 236 qc.DefaultProjectID = qq.DefaultDataset.ProjectId 237 qc.DefaultDatasetID = qq.DefaultDataset.DatasetId 238 } 239 if qq.MaximumBillingTier != nil { 240 qc.MaxBillingTier = int(*qq.MaximumBillingTier) 241 } 242 if qq.UseQueryCache != nil && !*qq.UseQueryCache { 243 qc.DisableQueryCache = true 244 } 245 if qq.FlattenResults != nil && !*qq.FlattenResults { 246 qc.DisableFlattenedResults = true 247 } 248 if qq.DestinationTable != nil { 249 qc.Dst = bqToTable(qq.DestinationTable, c) 250 } 251 for _, qp := range qq.QueryParameters { 252 p, err := bqToQueryParameter(qp) 253 if err != nil { 254 return nil, err 255 } 256 qc.Parameters = append(qc.Parameters, p) 257 } 258 return qc, nil 259} 260 261// QueryPriority specifies a priority with which a query is to be executed. 262type QueryPriority string 263 264const ( 265 // BatchPriority specifies that the query should be scheduled with the 266 // batch priority. BigQuery queues each batch query on your behalf, and 267 // starts the query as soon as idle resources are available, usually within 268 // a few minutes. If BigQuery hasn't started the query within 24 hours, 269 // BigQuery changes the job priority to interactive. Batch queries don't 270 // count towards your concurrent rate limit, which can make it easier to 271 // start many queries at once. 272 // 273 // More information can be found at https://cloud.google.com/bigquery/docs/running-queries#batchqueries. 274 BatchPriority QueryPriority = "BATCH" 275 // InteractivePriority specifies that the query should be scheduled with 276 // interactive priority, which means that the query is executed as soon as 277 // possible. Interactive queries count towards your concurrent rate limit 278 // and your daily limit. It is the default priority with which queries get 279 // executed. 280 // 281 // More information can be found at https://cloud.google.com/bigquery/docs/running-queries#queries. 282 InteractivePriority QueryPriority = "INTERACTIVE" 283) 284 285// A Query queries data from a BigQuery table. Use Client.Query to create a Query. 286type Query struct { 287 JobIDConfig 288 QueryConfig 289 client *Client 290} 291 292// Query creates a query with string q. 293// The returned Query may optionally be further configured before its Run method is called. 294func (c *Client) Query(q string) *Query { 295 return &Query{ 296 client: c, 297 QueryConfig: QueryConfig{Q: q}, 298 } 299} 300 301// Run initiates a query job. 302func (q *Query) Run(ctx context.Context) (j *Job, err error) { 303 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run") 304 defer func() { trace.EndSpan(ctx, err) }() 305 306 job, err := q.newJob() 307 if err != nil { 308 return nil, err 309 } 310 j, err = q.client.insertJob(ctx, job, nil) 311 if err != nil { 312 return nil, err 313 } 314 return j, nil 315} 316 317func (q *Query) newJob() (*bq.Job, error) { 318 config, err := q.QueryConfig.toBQ() 319 if err != nil { 320 return nil, err 321 } 322 return &bq.Job{ 323 JobReference: q.JobIDConfig.createJobRef(q.client), 324 Configuration: config, 325 }, nil 326} 327 328// Read submits a query for execution and returns the results via a RowIterator. 329// If the request can be satisfied by running using the optimized query path, it 330// is used in place of the jobs.insert path as this path does not expose a job 331// object. 332func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) { 333 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run") 334 defer func() { trace.EndSpan(ctx, err) }() 335 queryRequest, err := q.probeFastPath() 336 if err != nil { 337 // Any error means we fallback to the older mechanism. 338 job, err := q.Run(ctx) 339 if err != nil { 340 return nil, err 341 } 342 return job.Read(ctx) 343 } 344 // we have a config, run on fastPath. 345 resp, err := q.client.runQuery(ctx, queryRequest) 346 if err != nil { 347 return nil, err 348 } 349 // construct a minimal job for backing the row iterator. 350 minimalJob := &Job{ 351 c: q.client, 352 jobID: resp.JobReference.JobId, 353 location: resp.JobReference.Location, 354 projectID: resp.JobReference.ProjectId, 355 } 356 if resp.JobComplete { 357 rowSource := &rowSource{ 358 j: minimalJob, 359 // RowIterator can precache results from the iterator to save a lookup. 360 cachedRows: resp.Rows, 361 cachedSchema: resp.Schema, 362 cachedNextToken: resp.PageToken, 363 } 364 return newRowIterator(ctx, rowSource, fetchPage), nil 365 } 366 // We're on the fastPath, but we need to poll because the job is incomplete. 367 // Fallback to job-based Read(). 368 // 369 // (Issue 2937) In order to satisfy basic probing of the job in classic path, 370 // we need to supply additional config which is probed for presence, not contents. 371 // 372 minimalJob.config = &bq.JobConfiguration{ 373 Query: &bq.JobConfigurationQuery{}, 374 } 375 376 return minimalJob.Read(ctx) 377} 378 379// probeFastPath is used to attempt configuring a jobs.Query request based on a 380// user's Query configuration. If all the options set on the job are supported on the 381// faster query path, this method returns a QueryRequest suitable for execution. 382func (q *Query) probeFastPath() (*bq.QueryRequest, error) { 383 // This is a denylist of settings which prevent us from composing an equivalent 384 // bq.QueryRequest due to differences between configuration parameters accepted 385 // by jobs.insert vs jobs.query. 386 if q.QueryConfig.Dst != nil || 387 q.QueryConfig.TableDefinitions != nil || 388 q.QueryConfig.CreateDisposition != "" || 389 q.QueryConfig.WriteDisposition != "" || 390 !(q.QueryConfig.Priority == "" || q.QueryConfig.Priority == InteractivePriority) || 391 q.QueryConfig.UseLegacySQL || 392 q.QueryConfig.MaxBillingTier != 0 || 393 q.QueryConfig.TimePartitioning != nil || 394 q.QueryConfig.RangePartitioning != nil || 395 q.QueryConfig.Clustering != nil || 396 q.QueryConfig.DestinationEncryptionConfig != nil || 397 q.QueryConfig.SchemaUpdateOptions != nil || 398 // User has defined the jobID generation behavior 399 q.JobIDConfig.JobID != "" { 400 return nil, fmt.Errorf("QueryConfig incompatible with fastPath") 401 } 402 pfalse := false 403 qRequest := &bq.QueryRequest{ 404 Query: q.QueryConfig.Q, 405 Location: q.Location, 406 UseLegacySql: &pfalse, 407 MaximumBytesBilled: q.QueryConfig.MaxBytesBilled, 408 RequestId: uid.NewSpace("request", nil).New(), 409 Labels: q.Labels, 410 } 411 if q.QueryConfig.DisableQueryCache { 412 qRequest.UseQueryCache = &pfalse 413 } 414 // Convert query parameters 415 for _, p := range q.QueryConfig.Parameters { 416 qp, err := p.toBQ() 417 if err != nil { 418 return nil, err 419 } 420 qRequest.QueryParameters = append(qRequest.QueryParameters, qp) 421 } 422 if q.QueryConfig.DefaultDatasetID != "" { 423 qRequest.DefaultDataset = &bq.DatasetReference{ 424 ProjectId: q.QueryConfig.DefaultProjectID, 425 DatasetId: q.QueryConfig.DefaultDatasetID, 426 } 427 } 428 return qRequest, nil 429} 430