1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "io" 20 21 "cloud.google.com/go/internal/trace" 22 bq "google.golang.org/api/bigquery/v2" 23) 24 25// LoadConfig holds the configuration for a load job. 26type LoadConfig struct { 27 // Src is the source from which data will be loaded. 28 Src LoadSource 29 30 // Dst is the table into which the data will be loaded. 31 Dst *Table 32 33 // CreateDisposition specifies the circumstances under which the destination table will be created. 34 // The default is CreateIfNeeded. 35 CreateDisposition TableCreateDisposition 36 37 // WriteDisposition specifies how existing data in the destination table is treated. 38 // The default is WriteAppend. 39 WriteDisposition TableWriteDisposition 40 41 // The labels associated with this job. 42 Labels map[string]string 43 44 // If non-nil, the destination table is partitioned by time. 45 TimePartitioning *TimePartitioning 46 47 // If non-nil, the destination table is partitioned by integer range. 48 RangePartitioning *RangePartitioning 49 50 // Clustering specifies the data clustering configuration for the destination table. 51 Clustering *Clustering 52 53 // Custom encryption configuration (e.g., Cloud KMS keys). 54 DestinationEncryptionConfig *EncryptionConfig 55 56 // Allows the schema of the destination table to be updated as a side effect of 57 // the load job. 58 SchemaUpdateOptions []string 59 60 // For Avro-based loads, controls whether logical type annotations are used. 61 // See https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types 62 // for additional information. 63 UseAvroLogicalTypes bool 64 65 // For ingestion from datastore backups, ProjectionFields governs which fields 66 // are projected from the backup. The default behavior projects all fields. 67 ProjectionFields []string 68} 69 70func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) { 71 config := &bq.JobConfiguration{ 72 Labels: l.Labels, 73 Load: &bq.JobConfigurationLoad{ 74 CreateDisposition: string(l.CreateDisposition), 75 WriteDisposition: string(l.WriteDisposition), 76 DestinationTable: l.Dst.toBQ(), 77 TimePartitioning: l.TimePartitioning.toBQ(), 78 RangePartitioning: l.RangePartitioning.toBQ(), 79 Clustering: l.Clustering.toBQ(), 80 DestinationEncryptionConfiguration: l.DestinationEncryptionConfig.toBQ(), 81 SchemaUpdateOptions: l.SchemaUpdateOptions, 82 UseAvroLogicalTypes: l.UseAvroLogicalTypes, 83 ProjectionFields: l.ProjectionFields, 84 }, 85 } 86 media := l.Src.populateLoadConfig(config.Load) 87 return config, media 88} 89 90func bqToLoadConfig(q *bq.JobConfiguration, c *Client) *LoadConfig { 91 lc := &LoadConfig{ 92 Labels: q.Labels, 93 CreateDisposition: TableCreateDisposition(q.Load.CreateDisposition), 94 WriteDisposition: TableWriteDisposition(q.Load.WriteDisposition), 95 Dst: bqToTable(q.Load.DestinationTable, c), 96 TimePartitioning: bqToTimePartitioning(q.Load.TimePartitioning), 97 RangePartitioning: bqToRangePartitioning(q.Load.RangePartitioning), 98 Clustering: bqToClustering(q.Load.Clustering), 99 DestinationEncryptionConfig: bqToEncryptionConfig(q.Load.DestinationEncryptionConfiguration), 100 SchemaUpdateOptions: q.Load.SchemaUpdateOptions, 101 UseAvroLogicalTypes: q.Load.UseAvroLogicalTypes, 102 ProjectionFields: q.Load.ProjectionFields, 103 } 104 var fc *FileConfig 105 if len(q.Load.SourceUris) == 0 { 106 s := NewReaderSource(nil) 107 fc = &s.FileConfig 108 lc.Src = s 109 } else { 110 s := NewGCSReference(q.Load.SourceUris...) 111 fc = &s.FileConfig 112 lc.Src = s 113 } 114 bqPopulateFileConfig(q.Load, fc) 115 return lc 116} 117 118// A Loader loads data from Google Cloud Storage into a BigQuery table. 119type Loader struct { 120 JobIDConfig 121 LoadConfig 122 c *Client 123} 124 125// A LoadSource represents a source of data that can be loaded into 126// a BigQuery table. 127// 128// This package defines two LoadSources: GCSReference, for Google Cloud Storage 129// objects, and ReaderSource, for data read from an io.Reader. 130type LoadSource interface { 131 // populates config, returns media 132 populateLoadConfig(*bq.JobConfigurationLoad) io.Reader 133} 134 135// LoaderFrom returns a Loader which can be used to load data into a BigQuery table. 136// The returned Loader may optionally be further configured before its Run method is called. 137// See GCSReference and ReaderSource for additional configuration options that 138// affect loading. 139func (t *Table) LoaderFrom(src LoadSource) *Loader { 140 return &Loader{ 141 c: t.c, 142 LoadConfig: LoadConfig{ 143 Src: src, 144 Dst: t, 145 }, 146 } 147} 148 149// Run initiates a load job. 150func (l *Loader) Run(ctx context.Context) (j *Job, err error) { 151 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Load.Run") 152 defer func() { trace.EndSpan(ctx, err) }() 153 154 job, media := l.newJob() 155 return l.c.insertJob(ctx, job, media) 156} 157 158func (l *Loader) newJob() (*bq.Job, io.Reader) { 159 config, media := l.LoadConfig.toBQ() 160 return &bq.Job{ 161 JobReference: l.JobIDConfig.createJobRef(l.c), 162 Configuration: config, 163 }, media 164} 165