1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"strings"
19	"testing"
20	"time"
21
22	"cloud.google.com/go/internal/testutil"
23	"github.com/google/go-cmp/cmp"
24	"github.com/google/go-cmp/cmp/cmpopts"
25	bq "google.golang.org/api/bigquery/v2"
26)
27
28func defaultLoadJob() *bq.Job {
29	return &bq.Job{
30		JobReference: &bq.JobReference{JobId: "RANDOM", ProjectId: "client-project-id"},
31		Configuration: &bq.JobConfiguration{
32			Load: &bq.JobConfigurationLoad{
33				DestinationTable: &bq.TableReference{
34					ProjectId: "client-project-id",
35					DatasetId: "dataset-id",
36					TableId:   "table-id",
37				},
38				SourceUris: []string{"uri"},
39			},
40		},
41	}
42}
43
44func stringFieldSchema() *FieldSchema {
45	return &FieldSchema{Name: "fieldname", Type: StringFieldType}
46}
47
48func nestedFieldSchema() *FieldSchema {
49	return &FieldSchema{
50		Name:   "nested",
51		Type:   RecordFieldType,
52		Schema: Schema{stringFieldSchema()},
53	}
54}
55
56func bqStringFieldSchema() *bq.TableFieldSchema {
57	return &bq.TableFieldSchema{
58		Name: "fieldname",
59		Type: "STRING",
60	}
61}
62
63func bqNestedFieldSchema() *bq.TableFieldSchema {
64	return &bq.TableFieldSchema{
65		Name:   "nested",
66		Type:   "RECORD",
67		Fields: []*bq.TableFieldSchema{bqStringFieldSchema()},
68	}
69}
70
71func TestLoad(t *testing.T) {
72	defer fixRandomID("RANDOM")()
73	c := &Client{projectID: "client-project-id"}
74
75	testCases := []struct {
76		dst      *Table
77		src      LoadSource
78		jobID    string
79		location string
80		config   LoadConfig
81		want     *bq.Job
82	}{
83		{
84			dst:  c.Dataset("dataset-id").Table("table-id"),
85			src:  NewGCSReference("uri"),
86			want: defaultLoadJob(),
87		},
88		{
89			dst:      c.Dataset("dataset-id").Table("table-id"),
90			src:      NewGCSReference("uri"),
91			location: "loc",
92			want: func() *bq.Job {
93				j := defaultLoadJob()
94				j.JobReference.Location = "loc"
95				return j
96			}(),
97		},
98		{
99			dst:   c.Dataset("dataset-id").Table("table-id"),
100			jobID: "ajob",
101			config: LoadConfig{
102				CreateDisposition:           CreateNever,
103				WriteDisposition:            WriteTruncate,
104				Labels:                      map[string]string{"a": "b"},
105				TimePartitioning:            &TimePartitioning{Type: MonthPartitioningType, Expiration: 1234 * time.Millisecond},
106				Clustering:                  &Clustering{Fields: []string{"cfield1"}},
107				DestinationEncryptionConfig: &EncryptionConfig{KMSKeyName: "keyName"},
108				SchemaUpdateOptions:         []string{"ALLOW_FIELD_ADDITION"},
109			},
110			src: NewGCSReference("uri"),
111			want: func() *bq.Job {
112				j := defaultLoadJob()
113				j.Configuration.Labels = map[string]string{"a": "b"}
114				j.Configuration.Load.CreateDisposition = "CREATE_NEVER"
115				j.Configuration.Load.WriteDisposition = "WRITE_TRUNCATE"
116				j.Configuration.Load.TimePartitioning = &bq.TimePartitioning{
117					Type:         "MONTH",
118					ExpirationMs: 1234,
119				}
120				j.Configuration.Load.Clustering = &bq.Clustering{
121					Fields: []string{"cfield1"},
122				}
123				j.Configuration.Load.DestinationEncryptionConfiguration = &bq.EncryptionConfiguration{KmsKeyName: "keyName"}
124				j.JobReference = &bq.JobReference{
125					JobId:     "ajob",
126					ProjectId: "client-project-id",
127				}
128				j.Configuration.Load.SchemaUpdateOptions = []string{"ALLOW_FIELD_ADDITION"}
129				return j
130			}(),
131		},
132		{
133			dst: c.Dataset("dataset-id").Table("table-id"),
134			src: func() *GCSReference {
135				g := NewGCSReference("uri")
136				g.MaxBadRecords = 1
137				g.AllowJaggedRows = true
138				g.AllowQuotedNewlines = true
139				g.IgnoreUnknownValues = true
140				return g
141			}(),
142			want: func() *bq.Job {
143				j := defaultLoadJob()
144				j.Configuration.Load.MaxBadRecords = 1
145				j.Configuration.Load.AllowJaggedRows = true
146				j.Configuration.Load.AllowQuotedNewlines = true
147				j.Configuration.Load.IgnoreUnknownValues = true
148				return j
149			}(),
150		},
151		{
152			dst: c.Dataset("dataset-id").Table("table-id"),
153			src: func() *GCSReference {
154				g := NewGCSReference("uri")
155				g.Schema = Schema{
156					stringFieldSchema(),
157					nestedFieldSchema(),
158				}
159				return g
160			}(),
161			want: func() *bq.Job {
162				j := defaultLoadJob()
163				j.Configuration.Load.Schema = &bq.TableSchema{
164					Fields: []*bq.TableFieldSchema{
165						bqStringFieldSchema(),
166						bqNestedFieldSchema(),
167					}}
168				return j
169			}(),
170		},
171		{
172			dst: c.Dataset("dataset-id").Table("table-id"),
173			src: func() *GCSReference {
174				g := NewGCSReference("uri")
175				g.SkipLeadingRows = 1
176				g.SourceFormat = JSON
177				g.Encoding = UTF_8
178				g.FieldDelimiter = "\t"
179				g.Quote = "-"
180				return g
181			}(),
182			want: func() *bq.Job {
183				j := defaultLoadJob()
184				j.Configuration.Load.SkipLeadingRows = 1
185				j.Configuration.Load.SourceFormat = "NEWLINE_DELIMITED_JSON"
186				j.Configuration.Load.Encoding = "UTF-8"
187				j.Configuration.Load.FieldDelimiter = "\t"
188				hyphen := "-"
189				j.Configuration.Load.Quote = &hyphen
190				return j
191			}(),
192		},
193		{
194			dst: c.Dataset("dataset-id").Table("table-id"),
195			src: NewGCSReference("uri"),
196			want: func() *bq.Job {
197				j := defaultLoadJob()
198				// Quote is left unset in GCSReference, so should be nil here.
199				j.Configuration.Load.Quote = nil
200				return j
201			}(),
202		},
203		{
204			dst: c.Dataset("dataset-id").Table("table-id"),
205			src: func() *GCSReference {
206				g := NewGCSReference("uri")
207				g.ForceZeroQuote = true
208				return g
209			}(),
210			want: func() *bq.Job {
211				j := defaultLoadJob()
212				empty := ""
213				j.Configuration.Load.Quote = &empty
214				return j
215			}(),
216		},
217		{
218			dst: c.Dataset("dataset-id").Table("table-id"),
219			src: func() *ReaderSource {
220				r := NewReaderSource(strings.NewReader("foo"))
221				r.SkipLeadingRows = 1
222				r.SourceFormat = JSON
223				r.Encoding = UTF_8
224				r.FieldDelimiter = "\t"
225				r.Quote = "-"
226				return r
227			}(),
228			want: func() *bq.Job {
229				j := defaultLoadJob()
230				j.Configuration.Load.SourceUris = nil
231				j.Configuration.Load.SkipLeadingRows = 1
232				j.Configuration.Load.SourceFormat = "NEWLINE_DELIMITED_JSON"
233				j.Configuration.Load.Encoding = "UTF-8"
234				j.Configuration.Load.FieldDelimiter = "\t"
235				hyphen := "-"
236				j.Configuration.Load.Quote = &hyphen
237				return j
238			}(),
239		},
240		{
241			dst: c.Dataset("dataset-id").Table("table-id"),
242			src: func() *GCSReference {
243				g := NewGCSReference("uri")
244				g.SourceFormat = Avro
245				return g
246			}(),
247			config: LoadConfig{
248				UseAvroLogicalTypes: true,
249			},
250			want: func() *bq.Job {
251				j := defaultLoadJob()
252				j.Configuration.Load.SourceFormat = "AVRO"
253				j.Configuration.Load.UseAvroLogicalTypes = true
254				return j
255			}(),
256		},
257		{
258			dst: c.Dataset("dataset-id").Table("table-id"),
259			src: func() *ReaderSource {
260				r := NewReaderSource(strings.NewReader("foo"))
261				r.SourceFormat = Avro
262				return r
263			}(),
264			config: LoadConfig{
265				UseAvroLogicalTypes: true,
266			},
267			want: func() *bq.Job {
268				j := defaultLoadJob()
269				j.Configuration.Load.SourceUris = nil
270				j.Configuration.Load.SourceFormat = "AVRO"
271				j.Configuration.Load.UseAvroLogicalTypes = true
272				return j
273			}(),
274		},
275		{
276			dst: c.Dataset("dataset-id").Table("table-id"),
277			src: func() *ReaderSource {
278				r := NewReaderSource(strings.NewReader("foo"))
279				return r
280			}(),
281			config: LoadConfig{
282				TimePartitioning: &TimePartitioning{
283					Type:  HourPartitioningType,
284					Field: "somefield",
285				},
286			},
287			want: func() *bq.Job {
288				j := defaultLoadJob()
289				j.Configuration.Load.SourceUris = nil
290				j.Configuration.Load.TimePartitioning = &bq.TimePartitioning{
291					Field: "somefield",
292					Type:  "HOUR",
293				}
294				return j
295			}(),
296		},
297		{
298			dst: c.Dataset("dataset-id").Table("table-id"),
299			src: func() *ReaderSource {
300				r := NewReaderSource(strings.NewReader("foo"))
301				return r
302			}(),
303			config: LoadConfig{
304				RangePartitioning: &RangePartitioning{
305					Field: "somefield",
306					Range: &RangePartitioningRange{
307						Start:    1,
308						End:      2,
309						Interval: 3,
310					},
311				},
312			},
313			want: func() *bq.Job {
314				j := defaultLoadJob()
315				j.Configuration.Load.SourceUris = nil
316				j.Configuration.Load.RangePartitioning = &bq.RangePartitioning{
317					Field: "somefield",
318					Range: &bq.RangePartitioningRange{
319						Start:           1,
320						End:             2,
321						Interval:        3,
322						ForceSendFields: []string{"Start", "End", "Interval"},
323					},
324				}
325				return j
326			}(),
327		},
328		{
329			dst: c.Dataset("dataset-id").Table("table-id"),
330			src: func() *GCSReference {
331				g := NewGCSReference("uri")
332				g.SourceFormat = DatastoreBackup
333				return g
334			}(),
335			config: LoadConfig{
336				ProjectionFields: []string{"foo", "bar", "baz"},
337			},
338			want: func() *bq.Job {
339				j := defaultLoadJob()
340				j.Configuration.Load.SourceFormat = "DATASTORE_BACKUP"
341				j.Configuration.Load.ProjectionFields = []string{"foo", "bar", "baz"}
342				return j
343			}(),
344		},
345		{
346			dst: c.Dataset("dataset-id").Table("table-id"),
347			src: func() *GCSReference {
348				g := NewGCSReference("uri")
349				g.SourceFormat = Parquet
350				return g
351			}(),
352			config: LoadConfig{
353				HivePartitioningOptions: &HivePartitioningOptions{
354					Mode:                   CustomHivePartitioningMode,
355					SourceURIPrefix:        "source_uri",
356					RequirePartitionFilter: true,
357				},
358			},
359			want: func() *bq.Job {
360				j := defaultLoadJob()
361				j.Configuration.Load.SourceFormat = "PARQUET"
362				j.Configuration.Load.HivePartitioningOptions = &bq.HivePartitioningOptions{
363					Mode:                   "CUSTOM",
364					RequirePartitionFilter: true,
365					SourceUriPrefix:        "source_uri",
366				}
367				return j
368			}(),
369		},
370		{
371			dst: c.Dataset("dataset-id").Table("table-id"),
372			src: func() *GCSReference {
373				g := NewGCSReference("uri")
374				g.SourceFormat = Parquet
375				return g
376			}(),
377			config: LoadConfig{
378				DecimalTargetTypes: []DecimalTargetType{BigNumericTargetType, NumericTargetType, StringTargetType},
379			},
380			want: func() *bq.Job {
381				j := defaultLoadJob()
382				j.Configuration.Load.SourceFormat = "PARQUET"
383				j.Configuration.Load.DecimalTargetTypes = []string{"BIGNUMERIC", "NUMERIC", "STRING"}
384				return j
385			}(),
386		},
387	}
388
389	for i, tc := range testCases {
390		loader := tc.dst.LoaderFrom(tc.src)
391		loader.JobID = tc.jobID
392		loader.Location = tc.location
393		tc.config.Src = tc.src
394		tc.config.Dst = tc.dst
395		loader.LoadConfig = tc.config
396		got, _ := loader.newJob()
397		checkJob(t, i, got, tc.want)
398
399		jc, err := bqToJobConfig(got.Configuration, c)
400		if err != nil {
401			t.Fatalf("#%d: %v", i, err)
402		}
403		diff := testutil.Diff(jc.(*LoadConfig), &loader.LoadConfig,
404			cmp.AllowUnexported(Table{}, Client{}),
405			cmpopts.IgnoreUnexported(ReaderSource{}))
406		if diff != "" {
407			t.Errorf("#%d: (got=-, want=+:\n%s", i, diff)
408		}
409	}
410}
411