1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "errors" 20 21 "cloud.google.com/go/internal/trace" 22 bq "google.golang.org/api/bigquery/v2" 23) 24 25// QueryConfig holds the configuration for a query job. 26type QueryConfig struct { 27 // Dst is the table into which the results of the query will be written. 28 // If this field is nil, a temporary table will be created. 29 Dst *Table 30 31 // The query to execute. See https://cloud.google.com/bigquery/query-reference for details. 32 Q string 33 34 // DefaultProjectID and DefaultDatasetID specify the dataset to use for unqualified table names in the query. 35 // If DefaultProjectID is set, DefaultDatasetID must also be set. 36 DefaultProjectID string 37 DefaultDatasetID string 38 39 // TableDefinitions describes data sources outside of BigQuery. 40 // The map keys may be used as table names in the query string. 41 // 42 // When a QueryConfig is returned from Job.Config, the map values 43 // are always of type *ExternalDataConfig. 44 TableDefinitions map[string]ExternalData 45 46 // CreateDisposition specifies the circumstances under which the destination table will be created. 47 // The default is CreateIfNeeded. 48 CreateDisposition TableCreateDisposition 49 50 // WriteDisposition specifies how existing data in the destination table is treated. 51 // The default is WriteEmpty. 52 WriteDisposition TableWriteDisposition 53 54 // DisableQueryCache prevents results being fetched from the query cache. 55 // If this field is false, results are fetched from the cache if they are available. 56 // The query cache is a best-effort cache that is flushed whenever tables in the query are modified. 57 // Cached results are only available when TableID is unspecified in the query's destination Table. 58 // For more information, see https://cloud.google.com/bigquery/querying-data#querycaching 59 DisableQueryCache bool 60 61 // DisableFlattenedResults prevents results being flattened. 62 // If this field is false, results from nested and repeated fields are flattened. 63 // DisableFlattenedResults implies AllowLargeResults 64 // For more information, see https://cloud.google.com/bigquery/docs/data#nested 65 DisableFlattenedResults bool 66 67 // AllowLargeResults allows the query to produce arbitrarily large result tables. 68 // The destination must be a table. 69 // When using this option, queries will take longer to execute, even if the result set is small. 70 // For additional limitations, see https://cloud.google.com/bigquery/querying-data#largequeryresults 71 AllowLargeResults bool 72 73 // Priority specifies the priority with which to schedule the query. 74 // The default priority is InteractivePriority. 75 // For more information, see https://cloud.google.com/bigquery/querying-data#batchqueries 76 Priority QueryPriority 77 78 // MaxBillingTier sets the maximum billing tier for a Query. 79 // Queries that have resource usage beyond this tier will fail (without 80 // incurring a charge). If this field is zero, the project default will be used. 81 MaxBillingTier int 82 83 // MaxBytesBilled limits the number of bytes billed for 84 // this job. Queries that would exceed this limit will fail (without incurring 85 // a charge). 86 // If this field is less than 1, the project default will be 87 // used. 88 MaxBytesBilled int64 89 90 // UseStandardSQL causes the query to use standard SQL. The default. 91 // Deprecated: use UseLegacySQL. 92 UseStandardSQL bool 93 94 // UseLegacySQL causes the query to use legacy SQL. 95 UseLegacySQL bool 96 97 // Parameters is a list of query parameters. The presence of parameters 98 // implies the use of standard SQL. 99 // If the query uses positional syntax ("?"), then no parameter may have a name. 100 // If the query uses named syntax ("@p"), then all parameters must have names. 101 // It is illegal to mix positional and named syntax. 102 Parameters []QueryParameter 103 104 // TimePartitioning specifies time-based partitioning 105 // for the destination table. 106 TimePartitioning *TimePartitioning 107 108 // RangePartitioning specifies integer range-based partitioning 109 // for the destination table. 110 RangePartitioning *RangePartitioning 111 112 // Clustering specifies the data clustering configuration for the destination table. 113 Clustering *Clustering 114 115 // The labels associated with this job. 116 Labels map[string]string 117 118 // If true, don't actually run this job. A valid query will return a mostly 119 // empty response with some processing statistics, while an invalid query will 120 // return the same error it would if it wasn't a dry run. 121 // 122 // Query.Read will fail with dry-run queries. Call Query.Run instead, and then 123 // call LastStatus on the returned job to get statistics. Calling Status on a 124 // dry-run job will fail. 125 DryRun bool 126 127 // Custom encryption configuration (e.g., Cloud KMS keys). 128 DestinationEncryptionConfig *EncryptionConfig 129 130 // Allows the schema of the destination table to be updated as a side effect of 131 // the query job. 132 SchemaUpdateOptions []string 133} 134 135func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) { 136 qconf := &bq.JobConfigurationQuery{ 137 Query: qc.Q, 138 CreateDisposition: string(qc.CreateDisposition), 139 WriteDisposition: string(qc.WriteDisposition), 140 AllowLargeResults: qc.AllowLargeResults, 141 Priority: string(qc.Priority), 142 MaximumBytesBilled: qc.MaxBytesBilled, 143 TimePartitioning: qc.TimePartitioning.toBQ(), 144 RangePartitioning: qc.RangePartitioning.toBQ(), 145 Clustering: qc.Clustering.toBQ(), 146 DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(), 147 SchemaUpdateOptions: qc.SchemaUpdateOptions, 148 } 149 if len(qc.TableDefinitions) > 0 { 150 qconf.TableDefinitions = make(map[string]bq.ExternalDataConfiguration) 151 } 152 for name, data := range qc.TableDefinitions { 153 qconf.TableDefinitions[name] = data.toBQ() 154 } 155 if qc.DefaultProjectID != "" || qc.DefaultDatasetID != "" { 156 qconf.DefaultDataset = &bq.DatasetReference{ 157 DatasetId: qc.DefaultDatasetID, 158 ProjectId: qc.DefaultProjectID, 159 } 160 } 161 if tier := int64(qc.MaxBillingTier); tier > 0 { 162 qconf.MaximumBillingTier = &tier 163 } 164 f := false 165 if qc.DisableQueryCache { 166 qconf.UseQueryCache = &f 167 } 168 if qc.DisableFlattenedResults { 169 qconf.FlattenResults = &f 170 // DisableFlattenResults implies AllowLargeResults. 171 qconf.AllowLargeResults = true 172 } 173 if qc.UseStandardSQL && qc.UseLegacySQL { 174 return nil, errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL") 175 } 176 if len(qc.Parameters) > 0 && qc.UseLegacySQL { 177 return nil, errors.New("bigquery: cannot provide both Parameters (implying standard SQL) and UseLegacySQL") 178 } 179 ptrue := true 180 pfalse := false 181 if qc.UseLegacySQL { 182 qconf.UseLegacySql = &ptrue 183 } else { 184 qconf.UseLegacySql = &pfalse 185 } 186 if qc.Dst != nil && !qc.Dst.implicitTable() { 187 qconf.DestinationTable = qc.Dst.toBQ() 188 } 189 for _, p := range qc.Parameters { 190 qp, err := p.toBQ() 191 if err != nil { 192 return nil, err 193 } 194 qconf.QueryParameters = append(qconf.QueryParameters, qp) 195 } 196 return &bq.JobConfiguration{ 197 Labels: qc.Labels, 198 DryRun: qc.DryRun, 199 Query: qconf, 200 }, nil 201} 202 203func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) { 204 qq := q.Query 205 qc := &QueryConfig{ 206 Labels: q.Labels, 207 DryRun: q.DryRun, 208 Q: qq.Query, 209 CreateDisposition: TableCreateDisposition(qq.CreateDisposition), 210 WriteDisposition: TableWriteDisposition(qq.WriteDisposition), 211 AllowLargeResults: qq.AllowLargeResults, 212 Priority: QueryPriority(qq.Priority), 213 MaxBytesBilled: qq.MaximumBytesBilled, 214 UseLegacySQL: qq.UseLegacySql == nil || *qq.UseLegacySql, 215 TimePartitioning: bqToTimePartitioning(qq.TimePartitioning), 216 RangePartitioning: bqToRangePartitioning(qq.RangePartitioning), 217 Clustering: bqToClustering(qq.Clustering), 218 DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration), 219 SchemaUpdateOptions: qq.SchemaUpdateOptions, 220 } 221 qc.UseStandardSQL = !qc.UseLegacySQL 222 223 if len(qq.TableDefinitions) > 0 { 224 qc.TableDefinitions = make(map[string]ExternalData) 225 } 226 for name, qedc := range qq.TableDefinitions { 227 edc, err := bqToExternalDataConfig(&qedc) 228 if err != nil { 229 return nil, err 230 } 231 qc.TableDefinitions[name] = edc 232 } 233 if qq.DefaultDataset != nil { 234 qc.DefaultProjectID = qq.DefaultDataset.ProjectId 235 qc.DefaultDatasetID = qq.DefaultDataset.DatasetId 236 } 237 if qq.MaximumBillingTier != nil { 238 qc.MaxBillingTier = int(*qq.MaximumBillingTier) 239 } 240 if qq.UseQueryCache != nil && !*qq.UseQueryCache { 241 qc.DisableQueryCache = true 242 } 243 if qq.FlattenResults != nil && !*qq.FlattenResults { 244 qc.DisableFlattenedResults = true 245 } 246 if qq.DestinationTable != nil { 247 qc.Dst = bqToTable(qq.DestinationTable, c) 248 } 249 for _, qp := range qq.QueryParameters { 250 p, err := bqToQueryParameter(qp) 251 if err != nil { 252 return nil, err 253 } 254 qc.Parameters = append(qc.Parameters, p) 255 } 256 return qc, nil 257} 258 259// QueryPriority specifies a priority with which a query is to be executed. 260type QueryPriority string 261 262const ( 263 // BatchPriority specifies that the query should be scheduled with the 264 // batch priority. BigQuery queues each batch query on your behalf, and 265 // starts the query as soon as idle resources are available, usually within 266 // a few minutes. If BigQuery hasn't started the query within 24 hours, 267 // BigQuery changes the job priority to interactive. Batch queries don't 268 // count towards your concurrent rate limit, which can make it easier to 269 // start many queries at once. 270 // 271 // More information can be found at https://cloud.google.com/bigquery/docs/running-queries#batchqueries. 272 BatchPriority QueryPriority = "BATCH" 273 // InteractivePriority specifies that the query should be scheduled with 274 // interactive priority, which means that the query is executed as soon as 275 // possible. Interactive queries count towards your concurrent rate limit 276 // and your daily limit. It is the default priority with which queries get 277 // executed. 278 // 279 // More information can be found at https://cloud.google.com/bigquery/docs/running-queries#queries. 280 InteractivePriority QueryPriority = "INTERACTIVE" 281) 282 283// A Query queries data from a BigQuery table. Use Client.Query to create a Query. 284type Query struct { 285 JobIDConfig 286 QueryConfig 287 client *Client 288} 289 290// Query creates a query with string q. 291// The returned Query may optionally be further configured before its Run method is called. 292func (c *Client) Query(q string) *Query { 293 return &Query{ 294 client: c, 295 QueryConfig: QueryConfig{Q: q}, 296 } 297} 298 299// Run initiates a query job. 300func (q *Query) Run(ctx context.Context) (j *Job, err error) { 301 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run") 302 defer func() { trace.EndSpan(ctx, err) }() 303 304 job, err := q.newJob() 305 if err != nil { 306 return nil, err 307 } 308 j, err = q.client.insertJob(ctx, job, nil) 309 if err != nil { 310 return nil, err 311 } 312 return j, nil 313} 314 315func (q *Query) newJob() (*bq.Job, error) { 316 config, err := q.QueryConfig.toBQ() 317 if err != nil { 318 return nil, err 319 } 320 return &bq.Job{ 321 JobReference: q.JobIDConfig.createJobRef(q.client), 322 Configuration: config, 323 }, nil 324} 325 326// Read submits a query for execution and returns the results via a RowIterator. 327// It is a shorthand for Query.Run followed by Job.Read. 328func (q *Query) Read(ctx context.Context) (*RowIterator, error) { 329 job, err := q.Run(ctx) 330 if err != nil { 331 return nil, err 332 } 333 return job.Read(ctx) 334} 335