1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "io" 20 21 "cloud.google.com/go/internal/trace" 22 bq "google.golang.org/api/bigquery/v2" 23) 24 25// LoadConfig holds the configuration for a load job. 26type LoadConfig struct { 27 // Src is the source from which data will be loaded. 28 Src LoadSource 29 30 // Dst is the table into which the data will be loaded. 31 Dst *Table 32 33 // CreateDisposition specifies the circumstances under which the destination table will be created. 34 // The default is CreateIfNeeded. 35 CreateDisposition TableCreateDisposition 36 37 // WriteDisposition specifies how existing data in the destination table is treated. 38 // The default is WriteAppend. 39 WriteDisposition TableWriteDisposition 40 41 // The labels associated with this job. 42 Labels map[string]string 43 44 // If non-nil, the destination table is partitioned by time. 45 TimePartitioning *TimePartitioning 46 47 // Clustering specifies the data clustering configuration for the destination table. 48 Clustering *Clustering 49 50 // Custom encryption configuration (e.g., Cloud KMS keys). 51 DestinationEncryptionConfig *EncryptionConfig 52 53 // Allows the schema of the destination table to be updated as a side effect of 54 // the load job. 55 SchemaUpdateOptions []string 56 57 // For Avro-based loads, controls whether logical type annotations are used. 58 // See https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types 59 // for additional information. 60 UseAvroLogicalTypes bool 61} 62 63func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) { 64 config := &bq.JobConfiguration{ 65 Labels: l.Labels, 66 Load: &bq.JobConfigurationLoad{ 67 CreateDisposition: string(l.CreateDisposition), 68 WriteDisposition: string(l.WriteDisposition), 69 DestinationTable: l.Dst.toBQ(), 70 TimePartitioning: l.TimePartitioning.toBQ(), 71 Clustering: l.Clustering.toBQ(), 72 DestinationEncryptionConfiguration: l.DestinationEncryptionConfig.toBQ(), 73 SchemaUpdateOptions: l.SchemaUpdateOptions, 74 UseAvroLogicalTypes: l.UseAvroLogicalTypes, 75 }, 76 } 77 media := l.Src.populateLoadConfig(config.Load) 78 return config, media 79} 80 81func bqToLoadConfig(q *bq.JobConfiguration, c *Client) *LoadConfig { 82 lc := &LoadConfig{ 83 Labels: q.Labels, 84 CreateDisposition: TableCreateDisposition(q.Load.CreateDisposition), 85 WriteDisposition: TableWriteDisposition(q.Load.WriteDisposition), 86 Dst: bqToTable(q.Load.DestinationTable, c), 87 TimePartitioning: bqToTimePartitioning(q.Load.TimePartitioning), 88 Clustering: bqToClustering(q.Load.Clustering), 89 DestinationEncryptionConfig: bqToEncryptionConfig(q.Load.DestinationEncryptionConfiguration), 90 SchemaUpdateOptions: q.Load.SchemaUpdateOptions, 91 UseAvroLogicalTypes: q.Load.UseAvroLogicalTypes, 92 } 93 var fc *FileConfig 94 if len(q.Load.SourceUris) == 0 { 95 s := NewReaderSource(nil) 96 fc = &s.FileConfig 97 lc.Src = s 98 } else { 99 s := NewGCSReference(q.Load.SourceUris...) 100 fc = &s.FileConfig 101 lc.Src = s 102 } 103 bqPopulateFileConfig(q.Load, fc) 104 return lc 105} 106 107// A Loader loads data from Google Cloud Storage into a BigQuery table. 108type Loader struct { 109 JobIDConfig 110 LoadConfig 111 c *Client 112} 113 114// A LoadSource represents a source of data that can be loaded into 115// a BigQuery table. 116// 117// This package defines two LoadSources: GCSReference, for Google Cloud Storage 118// objects, and ReaderSource, for data read from an io.Reader. 119type LoadSource interface { 120 // populates config, returns media 121 populateLoadConfig(*bq.JobConfigurationLoad) io.Reader 122} 123 124// LoaderFrom returns a Loader which can be used to load data into a BigQuery table. 125// The returned Loader may optionally be further configured before its Run method is called. 126// See GCSReference and ReaderSource for additional configuration options that 127// affect loading. 128func (t *Table) LoaderFrom(src LoadSource) *Loader { 129 return &Loader{ 130 c: t.c, 131 LoadConfig: LoadConfig{ 132 Src: src, 133 Dst: t, 134 }, 135 } 136} 137 138// Run initiates a load job. 139func (l *Loader) Run(ctx context.Context) (j *Job, err error) { 140 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Load.Run") 141 defer func() { trace.EndSpan(ctx, err) }() 142 143 job, media := l.newJob() 144 return l.c.insertJob(ctx, job, media) 145} 146 147func (l *Loader) newJob() (*bq.Job, io.Reader) { 148 config, media := l.LoadConfig.toBQ() 149 return &bq.Job{ 150 JobReference: l.JobIDConfig.createJobRef(l.c), 151 Configuration: config, 152 }, media 153} 154