1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "errors" 20 21 "cloud.google.com/go/internal/trace" 22 bq "google.golang.org/api/bigquery/v2" 23) 24 25// QueryConfig holds the configuration for a query job. 26type QueryConfig struct { 27 // Dst is the table into which the results of the query will be written. 28 // If this field is nil, a temporary table will be created. 29 Dst *Table 30 31 // The query to execute. See https://cloud.google.com/bigquery/query-reference for details. 32 Q string 33 34 // DefaultProjectID and DefaultDatasetID specify the dataset to use for unqualified table names in the query. 35 // If DefaultProjectID is set, DefaultDatasetID must also be set. 36 DefaultProjectID string 37 DefaultDatasetID string 38 39 // TableDefinitions describes data sources outside of BigQuery. 40 // The map keys may be used as table names in the query string. 41 // 42 // When a QueryConfig is returned from Job.Config, the map values 43 // are always of type *ExternalDataConfig. 44 TableDefinitions map[string]ExternalData 45 46 // CreateDisposition specifies the circumstances under which the destination table will be created. 47 // The default is CreateIfNeeded. 48 CreateDisposition TableCreateDisposition 49 50 // WriteDisposition specifies how existing data in the destination table is treated. 51 // The default is WriteEmpty. 52 WriteDisposition TableWriteDisposition 53 54 // DisableQueryCache prevents results being fetched from the query cache. 55 // If this field is false, results are fetched from the cache if they are available. 56 // The query cache is a best-effort cache that is flushed whenever tables in the query are modified. 57 // Cached results are only available when TableID is unspecified in the query's destination Table. 58 // For more information, see https://cloud.google.com/bigquery/querying-data#querycaching 59 DisableQueryCache bool 60 61 // DisableFlattenedResults prevents results being flattened. 62 // If this field is false, results from nested and repeated fields are flattened. 63 // DisableFlattenedResults implies AllowLargeResults 64 // For more information, see https://cloud.google.com/bigquery/docs/data#nested 65 DisableFlattenedResults bool 66 67 // AllowLargeResults allows the query to produce arbitrarily large result tables. 68 // The destination must be a table. 69 // When using this option, queries will take longer to execute, even if the result set is small. 70 // For additional limitations, see https://cloud.google.com/bigquery/querying-data#largequeryresults 71 AllowLargeResults bool 72 73 // Priority specifies the priority with which to schedule the query. 74 // The default priority is InteractivePriority. 75 // For more information, see https://cloud.google.com/bigquery/querying-data#batchqueries 76 Priority QueryPriority 77 78 // MaxBillingTier sets the maximum billing tier for a Query. 79 // Queries that have resource usage beyond this tier will fail (without 80 // incurring a charge). If this field is zero, the project default will be used. 81 MaxBillingTier int 82 83 // MaxBytesBilled limits the number of bytes billed for 84 // this job. Queries that would exceed this limit will fail (without incurring 85 // a charge). 86 // If this field is less than 1, the project default will be 87 // used. 88 MaxBytesBilled int64 89 90 // UseStandardSQL causes the query to use standard SQL. The default. 91 // Deprecated: use UseLegacySQL. 92 UseStandardSQL bool 93 94 // UseLegacySQL causes the query to use legacy SQL. 95 UseLegacySQL bool 96 97 // Parameters is a list of query parameters. The presence of parameters 98 // implies the use of standard SQL. 99 // If the query uses positional syntax ("?"), then no parameter may have a name. 100 // If the query uses named syntax ("@p"), then all parameters must have names. 101 // It is illegal to mix positional and named syntax. 102 Parameters []QueryParameter 103 104 // TimePartitioning specifies time-based partitioning 105 // for the destination table. 106 TimePartitioning *TimePartitioning 107 108 // Clustering specifies the data clustering configuration for the destination table. 109 Clustering *Clustering 110 111 // The labels associated with this job. 112 Labels map[string]string 113 114 // If true, don't actually run this job. A valid query will return a mostly 115 // empty response with some processing statistics, while an invalid query will 116 // return the same error it would if it wasn't a dry run. 117 // 118 // Query.Read will fail with dry-run queries. Call Query.Run instead, and then 119 // call LastStatus on the returned job to get statistics. Calling Status on a 120 // dry-run job will fail. 121 DryRun bool 122 123 // Custom encryption configuration (e.g., Cloud KMS keys). 124 DestinationEncryptionConfig *EncryptionConfig 125 126 // Allows the schema of the destination table to be updated as a side effect of 127 // the query job. 128 SchemaUpdateOptions []string 129} 130 131func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) { 132 qconf := &bq.JobConfigurationQuery{ 133 Query: qc.Q, 134 CreateDisposition: string(qc.CreateDisposition), 135 WriteDisposition: string(qc.WriteDisposition), 136 AllowLargeResults: qc.AllowLargeResults, 137 Priority: string(qc.Priority), 138 MaximumBytesBilled: qc.MaxBytesBilled, 139 TimePartitioning: qc.TimePartitioning.toBQ(), 140 Clustering: qc.Clustering.toBQ(), 141 DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(), 142 SchemaUpdateOptions: qc.SchemaUpdateOptions, 143 } 144 if len(qc.TableDefinitions) > 0 { 145 qconf.TableDefinitions = make(map[string]bq.ExternalDataConfiguration) 146 } 147 for name, data := range qc.TableDefinitions { 148 qconf.TableDefinitions[name] = data.toBQ() 149 } 150 if qc.DefaultProjectID != "" || qc.DefaultDatasetID != "" { 151 qconf.DefaultDataset = &bq.DatasetReference{ 152 DatasetId: qc.DefaultDatasetID, 153 ProjectId: qc.DefaultProjectID, 154 } 155 } 156 if tier := int64(qc.MaxBillingTier); tier > 0 { 157 qconf.MaximumBillingTier = &tier 158 } 159 f := false 160 if qc.DisableQueryCache { 161 qconf.UseQueryCache = &f 162 } 163 if qc.DisableFlattenedResults { 164 qconf.FlattenResults = &f 165 // DisableFlattenResults implies AllowLargeResults. 166 qconf.AllowLargeResults = true 167 } 168 if qc.UseStandardSQL && qc.UseLegacySQL { 169 return nil, errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL") 170 } 171 if len(qc.Parameters) > 0 && qc.UseLegacySQL { 172 return nil, errors.New("bigquery: cannot provide both Parameters (implying standard SQL) and UseLegacySQL") 173 } 174 ptrue := true 175 pfalse := false 176 if qc.UseLegacySQL { 177 qconf.UseLegacySql = &ptrue 178 } else { 179 qconf.UseLegacySql = &pfalse 180 } 181 if qc.Dst != nil && !qc.Dst.implicitTable() { 182 qconf.DestinationTable = qc.Dst.toBQ() 183 } 184 for _, p := range qc.Parameters { 185 qp, err := p.toBQ() 186 if err != nil { 187 return nil, err 188 } 189 qconf.QueryParameters = append(qconf.QueryParameters, qp) 190 } 191 return &bq.JobConfiguration{ 192 Labels: qc.Labels, 193 DryRun: qc.DryRun, 194 Query: qconf, 195 }, nil 196} 197 198func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) { 199 qq := q.Query 200 qc := &QueryConfig{ 201 Labels: q.Labels, 202 DryRun: q.DryRun, 203 Q: qq.Query, 204 CreateDisposition: TableCreateDisposition(qq.CreateDisposition), 205 WriteDisposition: TableWriteDisposition(qq.WriteDisposition), 206 AllowLargeResults: qq.AllowLargeResults, 207 Priority: QueryPriority(qq.Priority), 208 MaxBytesBilled: qq.MaximumBytesBilled, 209 UseLegacySQL: qq.UseLegacySql == nil || *qq.UseLegacySql, 210 TimePartitioning: bqToTimePartitioning(qq.TimePartitioning), 211 Clustering: bqToClustering(qq.Clustering), 212 DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration), 213 SchemaUpdateOptions: qq.SchemaUpdateOptions, 214 } 215 qc.UseStandardSQL = !qc.UseLegacySQL 216 217 if len(qq.TableDefinitions) > 0 { 218 qc.TableDefinitions = make(map[string]ExternalData) 219 } 220 for name, qedc := range qq.TableDefinitions { 221 edc, err := bqToExternalDataConfig(&qedc) 222 if err != nil { 223 return nil, err 224 } 225 qc.TableDefinitions[name] = edc 226 } 227 if qq.DefaultDataset != nil { 228 qc.DefaultProjectID = qq.DefaultDataset.ProjectId 229 qc.DefaultDatasetID = qq.DefaultDataset.DatasetId 230 } 231 if qq.MaximumBillingTier != nil { 232 qc.MaxBillingTier = int(*qq.MaximumBillingTier) 233 } 234 if qq.UseQueryCache != nil && !*qq.UseQueryCache { 235 qc.DisableQueryCache = true 236 } 237 if qq.FlattenResults != nil && !*qq.FlattenResults { 238 qc.DisableFlattenedResults = true 239 } 240 if qq.DestinationTable != nil { 241 qc.Dst = bqToTable(qq.DestinationTable, c) 242 } 243 for _, qp := range qq.QueryParameters { 244 p, err := bqToQueryParameter(qp) 245 if err != nil { 246 return nil, err 247 } 248 qc.Parameters = append(qc.Parameters, p) 249 } 250 return qc, nil 251} 252 253// QueryPriority specifies a priority with which a query is to be executed. 254type QueryPriority string 255 256const ( 257 // BatchPriority specifies that the query should be scheduled with the 258 // batch priority. BigQuery queues each batch query on your behalf, and 259 // starts the query as soon as idle resources are available, usually within 260 // a few minutes. If BigQuery hasn't started the query within 24 hours, 261 // BigQuery changes the job priority to interactive. Batch queries don't 262 // count towards your concurrent rate limit, which can make it easier to 263 // start many queries at once. 264 // 265 // More information can be found at https://cloud.google.com/bigquery/docs/running-queries#batchqueries. 266 BatchPriority QueryPriority = "BATCH" 267 // InteractivePriority specifies that the query should be scheduled with 268 // interactive priority, which means that the query is executed as soon as 269 // possible. Interactive queries count towards your concurrent rate limit 270 // and your daily limit. It is the default priority with which queries get 271 // executed. 272 // 273 // More information can be found at https://cloud.google.com/bigquery/docs/running-queries#queries. 274 InteractivePriority QueryPriority = "INTERACTIVE" 275) 276 277// A Query queries data from a BigQuery table. Use Client.Query to create a Query. 278type Query struct { 279 JobIDConfig 280 QueryConfig 281 client *Client 282} 283 284// Query creates a query with string q. 285// The returned Query may optionally be further configured before its Run method is called. 286func (c *Client) Query(q string) *Query { 287 return &Query{ 288 client: c, 289 QueryConfig: QueryConfig{Q: q}, 290 } 291} 292 293// Run initiates a query job. 294func (q *Query) Run(ctx context.Context) (j *Job, err error) { 295 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run") 296 defer func() { trace.EndSpan(ctx, err) }() 297 298 job, err := q.newJob() 299 if err != nil { 300 return nil, err 301 } 302 j, err = q.client.insertJob(ctx, job, nil) 303 if err != nil { 304 return nil, err 305 } 306 return j, nil 307} 308 309func (q *Query) newJob() (*bq.Job, error) { 310 config, err := q.QueryConfig.toBQ() 311 if err != nil { 312 return nil, err 313 } 314 return &bq.Job{ 315 JobReference: q.JobIDConfig.createJobRef(q.client), 316 Configuration: config, 317 }, nil 318} 319 320// Read submits a query for execution and returns the results via a RowIterator. 321// It is a shorthand for Query.Run followed by Job.Read. 322func (q *Query) Read(ctx context.Context) (*RowIterator, error) { 323 job, err := q.Run(ctx) 324 if err != nil { 325 return nil, err 326 } 327 return job.Read(ctx) 328} 329