1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"io"
20
21	"cloud.google.com/go/internal/trace"
22	bq "google.golang.org/api/bigquery/v2"
23)
24
25// LoadConfig holds the configuration for a load job.
26type LoadConfig struct {
27	// Src is the source from which data will be loaded.
28	Src LoadSource
29
30	// Dst is the table into which the data will be loaded.
31	Dst *Table
32
33	// CreateDisposition specifies the circumstances under which the destination table will be created.
34	// The default is CreateIfNeeded.
35	CreateDisposition TableCreateDisposition
36
37	// WriteDisposition specifies how existing data in the destination table is treated.
38	// The default is WriteAppend.
39	WriteDisposition TableWriteDisposition
40
41	// The labels associated with this job.
42	Labels map[string]string
43
44	// If non-nil, the destination table is partitioned by time.
45	TimePartitioning *TimePartitioning
46
47	// Clustering specifies the data clustering configuration for the destination table.
48	Clustering *Clustering
49
50	// Custom encryption configuration (e.g., Cloud KMS keys).
51	DestinationEncryptionConfig *EncryptionConfig
52
53	// Allows the schema of the destination table to be updated as a side effect of
54	// the load job.
55	SchemaUpdateOptions []string
56
57	// For Avro-based loads, controls whether logical type annotations are used.
58	// See https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types
59	// for additional information.
60	UseAvroLogicalTypes bool
61}
62
63func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
64	config := &bq.JobConfiguration{
65		Labels: l.Labels,
66		Load: &bq.JobConfigurationLoad{
67			CreateDisposition:                  string(l.CreateDisposition),
68			WriteDisposition:                   string(l.WriteDisposition),
69			DestinationTable:                   l.Dst.toBQ(),
70			TimePartitioning:                   l.TimePartitioning.toBQ(),
71			Clustering:                         l.Clustering.toBQ(),
72			DestinationEncryptionConfiguration: l.DestinationEncryptionConfig.toBQ(),
73			SchemaUpdateOptions:                l.SchemaUpdateOptions,
74			UseAvroLogicalTypes:                l.UseAvroLogicalTypes,
75		},
76	}
77	media := l.Src.populateLoadConfig(config.Load)
78	return config, media
79}
80
81func bqToLoadConfig(q *bq.JobConfiguration, c *Client) *LoadConfig {
82	lc := &LoadConfig{
83		Labels:                      q.Labels,
84		CreateDisposition:           TableCreateDisposition(q.Load.CreateDisposition),
85		WriteDisposition:            TableWriteDisposition(q.Load.WriteDisposition),
86		Dst:                         bqToTable(q.Load.DestinationTable, c),
87		TimePartitioning:            bqToTimePartitioning(q.Load.TimePartitioning),
88		Clustering:                  bqToClustering(q.Load.Clustering),
89		DestinationEncryptionConfig: bqToEncryptionConfig(q.Load.DestinationEncryptionConfiguration),
90		SchemaUpdateOptions:         q.Load.SchemaUpdateOptions,
91		UseAvroLogicalTypes:         q.Load.UseAvroLogicalTypes,
92	}
93	var fc *FileConfig
94	if len(q.Load.SourceUris) == 0 {
95		s := NewReaderSource(nil)
96		fc = &s.FileConfig
97		lc.Src = s
98	} else {
99		s := NewGCSReference(q.Load.SourceUris...)
100		fc = &s.FileConfig
101		lc.Src = s
102	}
103	bqPopulateFileConfig(q.Load, fc)
104	return lc
105}
106
107// A Loader loads data from Google Cloud Storage into a BigQuery table.
108type Loader struct {
109	JobIDConfig
110	LoadConfig
111	c *Client
112}
113
114// A LoadSource represents a source of data that can be loaded into
115// a BigQuery table.
116//
117// This package defines two LoadSources: GCSReference, for Google Cloud Storage
118// objects, and ReaderSource, for data read from an io.Reader.
119type LoadSource interface {
120	// populates config, returns media
121	populateLoadConfig(*bq.JobConfigurationLoad) io.Reader
122}
123
124// LoaderFrom returns a Loader which can be used to load data into a BigQuery table.
125// The returned Loader may optionally be further configured before its Run method is called.
126// See GCSReference and ReaderSource for additional configuration options that
127// affect loading.
128func (t *Table) LoaderFrom(src LoadSource) *Loader {
129	return &Loader{
130		c: t.c,
131		LoadConfig: LoadConfig{
132			Src: src,
133			Dst: t,
134		},
135	}
136}
137
138// Run initiates a load job.
139func (l *Loader) Run(ctx context.Context) (j *Job, err error) {
140	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Load.Run")
141	defer func() { trace.EndSpan(ctx, err) }()
142
143	job, media := l.newJob()
144	return l.c.insertJob(ctx, job, media)
145}
146
147func (l *Loader) newJob() (*bq.Job, io.Reader) {
148	config, media := l.LoadConfig.toBQ()
149	return &bq.Job{
150		JobReference:  l.JobIDConfig.createJobRef(l.c),
151		Configuration: config,
152	}, media
153}
154