1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "strings" 19 "testing" 20 "time" 21 22 "cloud.google.com/go/internal/testutil" 23 "github.com/google/go-cmp/cmp" 24 "github.com/google/go-cmp/cmp/cmpopts" 25 bq "google.golang.org/api/bigquery/v2" 26) 27 28func defaultLoadJob() *bq.Job { 29 return &bq.Job{ 30 JobReference: &bq.JobReference{JobId: "RANDOM", ProjectId: "client-project-id"}, 31 Configuration: &bq.JobConfiguration{ 32 Load: &bq.JobConfigurationLoad{ 33 DestinationTable: &bq.TableReference{ 34 ProjectId: "client-project-id", 35 DatasetId: "dataset-id", 36 TableId: "table-id", 37 }, 38 SourceUris: []string{"uri"}, 39 }, 40 }, 41 } 42} 43 44func stringFieldSchema() *FieldSchema { 45 return &FieldSchema{Name: "fieldname", Type: StringFieldType} 46} 47 48func nestedFieldSchema() *FieldSchema { 49 return &FieldSchema{ 50 Name: "nested", 51 Type: RecordFieldType, 52 Schema: Schema{stringFieldSchema()}, 53 } 54} 55 56func bqStringFieldSchema() *bq.TableFieldSchema { 57 return &bq.TableFieldSchema{ 58 Name: "fieldname", 59 Type: "STRING", 60 } 61} 62 63func bqNestedFieldSchema() *bq.TableFieldSchema { 64 return &bq.TableFieldSchema{ 65 Name: "nested", 66 Type: "RECORD", 67 Fields: []*bq.TableFieldSchema{bqStringFieldSchema()}, 68 } 69} 70 71func TestLoad(t *testing.T) { 72 defer fixRandomID("RANDOM")() 73 c := &Client{projectID: "client-project-id"} 74 75 testCases := []struct { 76 dst *Table 77 src LoadSource 78 jobID string 79 location string 80 config LoadConfig 81 want *bq.Job 82 }{ 83 { 84 dst: c.Dataset("dataset-id").Table("table-id"), 85 src: NewGCSReference("uri"), 86 want: defaultLoadJob(), 87 }, 88 { 89 dst: c.Dataset("dataset-id").Table("table-id"), 90 src: NewGCSReference("uri"), 91 location: "loc", 92 want: func() *bq.Job { 93 j := defaultLoadJob() 94 j.JobReference.Location = "loc" 95 return j 96 }(), 97 }, 98 { 99 dst: c.Dataset("dataset-id").Table("table-id"), 100 jobID: "ajob", 101 config: LoadConfig{ 102 CreateDisposition: CreateNever, 103 WriteDisposition: WriteTruncate, 104 Labels: map[string]string{"a": "b"}, 105 TimePartitioning: &TimePartitioning{Type: MonthPartitioningType, Expiration: 1234 * time.Millisecond}, 106 Clustering: &Clustering{Fields: []string{"cfield1"}}, 107 DestinationEncryptionConfig: &EncryptionConfig{KMSKeyName: "keyName"}, 108 SchemaUpdateOptions: []string{"ALLOW_FIELD_ADDITION"}, 109 }, 110 src: NewGCSReference("uri"), 111 want: func() *bq.Job { 112 j := defaultLoadJob() 113 j.Configuration.Labels = map[string]string{"a": "b"} 114 j.Configuration.Load.CreateDisposition = "CREATE_NEVER" 115 j.Configuration.Load.WriteDisposition = "WRITE_TRUNCATE" 116 j.Configuration.Load.TimePartitioning = &bq.TimePartitioning{ 117 Type: "MONTH", 118 ExpirationMs: 1234, 119 } 120 j.Configuration.Load.Clustering = &bq.Clustering{ 121 Fields: []string{"cfield1"}, 122 } 123 j.Configuration.Load.DestinationEncryptionConfiguration = &bq.EncryptionConfiguration{KmsKeyName: "keyName"} 124 j.JobReference = &bq.JobReference{ 125 JobId: "ajob", 126 ProjectId: "client-project-id", 127 } 128 j.Configuration.Load.SchemaUpdateOptions = []string{"ALLOW_FIELD_ADDITION"} 129 return j 130 }(), 131 }, 132 { 133 dst: c.Dataset("dataset-id").Table("table-id"), 134 src: func() *GCSReference { 135 g := NewGCSReference("uri") 136 g.MaxBadRecords = 1 137 g.AllowJaggedRows = true 138 g.AllowQuotedNewlines = true 139 g.IgnoreUnknownValues = true 140 return g 141 }(), 142 want: func() *bq.Job { 143 j := defaultLoadJob() 144 j.Configuration.Load.MaxBadRecords = 1 145 j.Configuration.Load.AllowJaggedRows = true 146 j.Configuration.Load.AllowQuotedNewlines = true 147 j.Configuration.Load.IgnoreUnknownValues = true 148 return j 149 }(), 150 }, 151 { 152 dst: c.Dataset("dataset-id").Table("table-id"), 153 src: func() *GCSReference { 154 g := NewGCSReference("uri") 155 g.Schema = Schema{ 156 stringFieldSchema(), 157 nestedFieldSchema(), 158 } 159 return g 160 }(), 161 want: func() *bq.Job { 162 j := defaultLoadJob() 163 j.Configuration.Load.Schema = &bq.TableSchema{ 164 Fields: []*bq.TableFieldSchema{ 165 bqStringFieldSchema(), 166 bqNestedFieldSchema(), 167 }} 168 return j 169 }(), 170 }, 171 { 172 dst: c.Dataset("dataset-id").Table("table-id"), 173 src: func() *GCSReference { 174 g := NewGCSReference("uri") 175 g.SkipLeadingRows = 1 176 g.SourceFormat = JSON 177 g.Encoding = UTF_8 178 g.FieldDelimiter = "\t" 179 g.Quote = "-" 180 return g 181 }(), 182 want: func() *bq.Job { 183 j := defaultLoadJob() 184 j.Configuration.Load.SkipLeadingRows = 1 185 j.Configuration.Load.SourceFormat = "NEWLINE_DELIMITED_JSON" 186 j.Configuration.Load.Encoding = "UTF-8" 187 j.Configuration.Load.FieldDelimiter = "\t" 188 hyphen := "-" 189 j.Configuration.Load.Quote = &hyphen 190 return j 191 }(), 192 }, 193 { 194 dst: c.Dataset("dataset-id").Table("table-id"), 195 src: NewGCSReference("uri"), 196 want: func() *bq.Job { 197 j := defaultLoadJob() 198 // Quote is left unset in GCSReference, so should be nil here. 199 j.Configuration.Load.Quote = nil 200 return j 201 }(), 202 }, 203 { 204 dst: c.Dataset("dataset-id").Table("table-id"), 205 src: func() *GCSReference { 206 g := NewGCSReference("uri") 207 g.ForceZeroQuote = true 208 return g 209 }(), 210 want: func() *bq.Job { 211 j := defaultLoadJob() 212 empty := "" 213 j.Configuration.Load.Quote = &empty 214 return j 215 }(), 216 }, 217 { 218 dst: c.Dataset("dataset-id").Table("table-id"), 219 src: func() *ReaderSource { 220 r := NewReaderSource(strings.NewReader("foo")) 221 r.SkipLeadingRows = 1 222 r.SourceFormat = JSON 223 r.Encoding = UTF_8 224 r.FieldDelimiter = "\t" 225 r.Quote = "-" 226 return r 227 }(), 228 want: func() *bq.Job { 229 j := defaultLoadJob() 230 j.Configuration.Load.SourceUris = nil 231 j.Configuration.Load.SkipLeadingRows = 1 232 j.Configuration.Load.SourceFormat = "NEWLINE_DELIMITED_JSON" 233 j.Configuration.Load.Encoding = "UTF-8" 234 j.Configuration.Load.FieldDelimiter = "\t" 235 hyphen := "-" 236 j.Configuration.Load.Quote = &hyphen 237 return j 238 }(), 239 }, 240 { 241 dst: c.Dataset("dataset-id").Table("table-id"), 242 src: func() *GCSReference { 243 g := NewGCSReference("uri") 244 g.SourceFormat = Avro 245 return g 246 }(), 247 config: LoadConfig{ 248 UseAvroLogicalTypes: true, 249 }, 250 want: func() *bq.Job { 251 j := defaultLoadJob() 252 j.Configuration.Load.SourceFormat = "AVRO" 253 j.Configuration.Load.UseAvroLogicalTypes = true 254 return j 255 }(), 256 }, 257 { 258 dst: c.Dataset("dataset-id").Table("table-id"), 259 src: func() *ReaderSource { 260 r := NewReaderSource(strings.NewReader("foo")) 261 r.SourceFormat = Avro 262 return r 263 }(), 264 config: LoadConfig{ 265 UseAvroLogicalTypes: true, 266 }, 267 want: func() *bq.Job { 268 j := defaultLoadJob() 269 j.Configuration.Load.SourceUris = nil 270 j.Configuration.Load.SourceFormat = "AVRO" 271 j.Configuration.Load.UseAvroLogicalTypes = true 272 return j 273 }(), 274 }, 275 { 276 dst: c.Dataset("dataset-id").Table("table-id"), 277 src: func() *ReaderSource { 278 r := NewReaderSource(strings.NewReader("foo")) 279 return r 280 }(), 281 config: LoadConfig{ 282 TimePartitioning: &TimePartitioning{ 283 Type: HourPartitioningType, 284 Field: "somefield", 285 }, 286 }, 287 want: func() *bq.Job { 288 j := defaultLoadJob() 289 j.Configuration.Load.SourceUris = nil 290 j.Configuration.Load.TimePartitioning = &bq.TimePartitioning{ 291 Field: "somefield", 292 Type: "HOUR", 293 } 294 return j 295 }(), 296 }, 297 { 298 dst: c.Dataset("dataset-id").Table("table-id"), 299 src: func() *ReaderSource { 300 r := NewReaderSource(strings.NewReader("foo")) 301 return r 302 }(), 303 config: LoadConfig{ 304 RangePartitioning: &RangePartitioning{ 305 Field: "somefield", 306 Range: &RangePartitioningRange{ 307 Start: 1, 308 End: 2, 309 Interval: 3, 310 }, 311 }, 312 }, 313 want: func() *bq.Job { 314 j := defaultLoadJob() 315 j.Configuration.Load.SourceUris = nil 316 j.Configuration.Load.RangePartitioning = &bq.RangePartitioning{ 317 Field: "somefield", 318 Range: &bq.RangePartitioningRange{ 319 Start: 1, 320 End: 2, 321 Interval: 3, 322 ForceSendFields: []string{"Start", "End", "Interval"}, 323 }, 324 } 325 return j 326 }(), 327 }, 328 { 329 dst: c.Dataset("dataset-id").Table("table-id"), 330 src: func() *GCSReference { 331 g := NewGCSReference("uri") 332 g.SourceFormat = DatastoreBackup 333 return g 334 }(), 335 config: LoadConfig{ 336 ProjectionFields: []string{"foo", "bar", "baz"}, 337 }, 338 want: func() *bq.Job { 339 j := defaultLoadJob() 340 j.Configuration.Load.SourceFormat = "DATASTORE_BACKUP" 341 j.Configuration.Load.ProjectionFields = []string{"foo", "bar", "baz"} 342 return j 343 }(), 344 }, 345 { 346 dst: c.Dataset("dataset-id").Table("table-id"), 347 src: func() *GCSReference { 348 g := NewGCSReference("uri") 349 g.SourceFormat = Parquet 350 return g 351 }(), 352 config: LoadConfig{ 353 HivePartitioningOptions: &HivePartitioningOptions{ 354 Mode: CustomHivePartitioningMode, 355 SourceURIPrefix: "source_uri", 356 RequirePartitionFilter: true, 357 }, 358 }, 359 want: func() *bq.Job { 360 j := defaultLoadJob() 361 j.Configuration.Load.SourceFormat = "PARQUET" 362 j.Configuration.Load.HivePartitioningOptions = &bq.HivePartitioningOptions{ 363 Mode: "CUSTOM", 364 RequirePartitionFilter: true, 365 SourceUriPrefix: "source_uri", 366 } 367 return j 368 }(), 369 }, 370 { 371 dst: c.Dataset("dataset-id").Table("table-id"), 372 src: func() *GCSReference { 373 g := NewGCSReference("uri") 374 g.SourceFormat = Parquet 375 return g 376 }(), 377 config: LoadConfig{ 378 DecimalTargetTypes: []DecimalTargetType{BigNumericTargetType, NumericTargetType, StringTargetType}, 379 }, 380 want: func() *bq.Job { 381 j := defaultLoadJob() 382 j.Configuration.Load.SourceFormat = "PARQUET" 383 j.Configuration.Load.DecimalTargetTypes = []string{"BIGNUMERIC", "NUMERIC", "STRING"} 384 return j 385 }(), 386 }, 387 } 388 389 for i, tc := range testCases { 390 loader := tc.dst.LoaderFrom(tc.src) 391 loader.JobID = tc.jobID 392 loader.Location = tc.location 393 tc.config.Src = tc.src 394 tc.config.Dst = tc.dst 395 loader.LoadConfig = tc.config 396 got, _ := loader.newJob() 397 checkJob(t, i, got, tc.want) 398 399 jc, err := bqToJobConfig(got.Configuration, c) 400 if err != nil { 401 t.Fatalf("#%d: %v", i, err) 402 } 403 diff := testutil.Diff(jc.(*LoadConfig), &loader.LoadConfig, 404 cmp.AllowUnexported(Table{}, Client{}), 405 cmpopts.IgnoreUnexported(ReaderSource{})) 406 if diff != "" { 407 t.Errorf("#%d: (got=-, want=+:\n%s", i, diff) 408 } 409 } 410} 411