1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"io"
20
21	"cloud.google.com/go/internal/trace"
22	bq "google.golang.org/api/bigquery/v2"
23)
24
25// LoadConfig holds the configuration for a load job.
26type LoadConfig struct {
27	// Src is the source from which data will be loaded.
28	Src LoadSource
29
30	// Dst is the table into which the data will be loaded.
31	Dst *Table
32
33	// CreateDisposition specifies the circumstances under which the destination table will be created.
34	// The default is CreateIfNeeded.
35	CreateDisposition TableCreateDisposition
36
37	// WriteDisposition specifies how existing data in the destination table is treated.
38	// The default is WriteAppend.
39	WriteDisposition TableWriteDisposition
40
41	// The labels associated with this job.
42	Labels map[string]string
43
44	// If non-nil, the destination table is partitioned by time.
45	TimePartitioning *TimePartitioning
46
47	// If non-nil, the destination table is partitioned by integer range.
48	RangePartitioning *RangePartitioning
49
50	// Clustering specifies the data clustering configuration for the destination table.
51	Clustering *Clustering
52
53	// Custom encryption configuration (e.g., Cloud KMS keys).
54	DestinationEncryptionConfig *EncryptionConfig
55
56	// Allows the schema of the destination table to be updated as a side effect of
57	// the load job.
58	SchemaUpdateOptions []string
59
60	// For Avro-based loads, controls whether logical type annotations are used.
61	// See https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types
62	// for additional information.
63	UseAvroLogicalTypes bool
64
65	// For ingestion from datastore backups, ProjectionFields governs which fields
66	// are projected from the backup.  The default behavior projects all fields.
67	ProjectionFields []string
68}
69
70func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
71	config := &bq.JobConfiguration{
72		Labels: l.Labels,
73		Load: &bq.JobConfigurationLoad{
74			CreateDisposition:                  string(l.CreateDisposition),
75			WriteDisposition:                   string(l.WriteDisposition),
76			DestinationTable:                   l.Dst.toBQ(),
77			TimePartitioning:                   l.TimePartitioning.toBQ(),
78			RangePartitioning:                  l.RangePartitioning.toBQ(),
79			Clustering:                         l.Clustering.toBQ(),
80			DestinationEncryptionConfiguration: l.DestinationEncryptionConfig.toBQ(),
81			SchemaUpdateOptions:                l.SchemaUpdateOptions,
82			UseAvroLogicalTypes:                l.UseAvroLogicalTypes,
83			ProjectionFields:                   l.ProjectionFields,
84		},
85	}
86	media := l.Src.populateLoadConfig(config.Load)
87	return config, media
88}
89
90func bqToLoadConfig(q *bq.JobConfiguration, c *Client) *LoadConfig {
91	lc := &LoadConfig{
92		Labels:                      q.Labels,
93		CreateDisposition:           TableCreateDisposition(q.Load.CreateDisposition),
94		WriteDisposition:            TableWriteDisposition(q.Load.WriteDisposition),
95		Dst:                         bqToTable(q.Load.DestinationTable, c),
96		TimePartitioning:            bqToTimePartitioning(q.Load.TimePartitioning),
97		RangePartitioning:           bqToRangePartitioning(q.Load.RangePartitioning),
98		Clustering:                  bqToClustering(q.Load.Clustering),
99		DestinationEncryptionConfig: bqToEncryptionConfig(q.Load.DestinationEncryptionConfiguration),
100		SchemaUpdateOptions:         q.Load.SchemaUpdateOptions,
101		UseAvroLogicalTypes:         q.Load.UseAvroLogicalTypes,
102		ProjectionFields:            q.Load.ProjectionFields,
103	}
104	var fc *FileConfig
105	if len(q.Load.SourceUris) == 0 {
106		s := NewReaderSource(nil)
107		fc = &s.FileConfig
108		lc.Src = s
109	} else {
110		s := NewGCSReference(q.Load.SourceUris...)
111		fc = &s.FileConfig
112		lc.Src = s
113	}
114	bqPopulateFileConfig(q.Load, fc)
115	return lc
116}
117
118// A Loader loads data from Google Cloud Storage into a BigQuery table.
119type Loader struct {
120	JobIDConfig
121	LoadConfig
122	c *Client
123}
124
125// A LoadSource represents a source of data that can be loaded into
126// a BigQuery table.
127//
128// This package defines two LoadSources: GCSReference, for Google Cloud Storage
129// objects, and ReaderSource, for data read from an io.Reader.
130type LoadSource interface {
131	// populates config, returns media
132	populateLoadConfig(*bq.JobConfigurationLoad) io.Reader
133}
134
135// LoaderFrom returns a Loader which can be used to load data into a BigQuery table.
136// The returned Loader may optionally be further configured before its Run method is called.
137// See GCSReference and ReaderSource for additional configuration options that
138// affect loading.
139func (t *Table) LoaderFrom(src LoadSource) *Loader {
140	return &Loader{
141		c: t.c,
142		LoadConfig: LoadConfig{
143			Src: src,
144			Dst: t,
145		},
146	}
147}
148
149// Run initiates a load job.
150func (l *Loader) Run(ctx context.Context) (j *Job, err error) {
151	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Load.Run")
152	defer func() { trace.EndSpan(ctx, err) }()
153
154	job, media := l.newJob()
155	return l.c.insertJob(ctx, job, media)
156}
157
158func (l *Loader) newJob() (*bq.Job, io.Reader) {
159	config, media := l.LoadConfig.toBQ()
160	return &bq.Job{
161		JobReference:  l.JobIDConfig.createJobRef(l.c),
162		Configuration: config,
163	}, media
164}
165