1// Copyright (C) MongoDB, Inc. 2014-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongoimport 8 9import ( 10 "bufio" 11 "bytes" 12 "fmt" 13 "io" 14 "sort" 15 "strconv" 16 "strings" 17 "sync" 18 "sync/atomic" 19 20 "github.com/mongodb/mongo-tools-common/bsonutil" 21 "github.com/mongodb/mongo-tools-common/log" 22 "github.com/mongodb/mongo-tools-common/util" 23 "go.mongodb.org/mongo-driver/bson" 24 "gopkg.in/tomb.v2" 25) 26 27type ParseGrace int 28 29const ( 30 pgAutoCast ParseGrace = iota 31 pgSkipField 32 pgSkipRow 33 pgStop 34) 35 36// ValidatePG ensures the user-provided parseGrace is one of the allowed 37// values. 38func ValidatePG(pg string) (ParseGrace, error) { 39 switch pg { 40 case "autoCast": 41 return pgAutoCast, nil 42 case "skipField": 43 return pgSkipField, nil 44 case "skipRow": 45 return pgSkipRow, nil 46 case "stop": 47 return pgStop, nil 48 default: 49 return pgAutoCast, fmt.Errorf("invalid parse grace: %s", pg) 50 } 51} 52 53// ParsePG interprets the user-provided parseGrace, assuming it is valid. 54func ParsePG(pg string) (res ParseGrace) { 55 res, _ = ValidatePG(pg) 56 return 57} 58 59// Converter is an interface that adds the basic Convert method which returns a 60// valid BSON document that has been converted by the underlying implementation. 61// If conversion fails, err will be set. 62type Converter interface { 63 Convert() (document bson.D, err error) 64} 65 66// An importWorker reads Converter from the unprocessedDataChan channel and 67// sends processed BSON documents on the processedDocumentChan channel 68type importWorker struct { 69 // unprocessedDataChan is used to stream the input data for a worker to process 70 unprocessedDataChan chan Converter 71 72 // used to stream the processed document back to the caller 73 processedDocumentChan chan bson.D 74 75 // used to synchronise all worker goroutines 76 tomb *tomb.Tomb 77} 78 79// an interface for tracking the number of bytes, which is used in mongoimport to feed 80// the progress bar. 81type sizeTracker interface { 82 Size() int64 83} 84 85// sizeTrackingReader implements Reader and sizeTracker by wrapping an io.Reader and keeping track 86// of the total number of bytes read from each call to Read(). 87type sizeTrackingReader struct { 88 bytesRead int64 89 reader io.Reader 90} 91 92func (str *sizeTrackingReader) Size() int64 { 93 bytes := atomic.LoadInt64(&str.bytesRead) 94 return bytes 95} 96 97func (str *sizeTrackingReader) Read(p []byte) (n int, err error) { 98 n, err = str.reader.Read(p) 99 atomic.AddInt64(&str.bytesRead, int64(n)) 100 return 101} 102 103func newSizeTrackingReader(reader io.Reader) *sizeTrackingReader { 104 return &sizeTrackingReader{ 105 reader: reader, 106 bytesRead: 0, 107 } 108} 109 110var ( 111 UTF8_BOM = []byte{0xEF, 0xBB, 0xBF} 112) 113 114// bomDiscardingReader implements and wraps io.Reader, discarding the UTF-8 BOM, if applicable 115type bomDiscardingReader struct { 116 buf *bufio.Reader 117 didRead bool 118} 119 120func (bd *bomDiscardingReader) Read(p []byte) (int, error) { 121 if !bd.didRead { 122 bom, err := bd.buf.Peek(3) 123 if err == nil && bytes.Equal(bom, UTF8_BOM) { 124 bd.buf.Read(make([]byte, 3)) // discard BOM 125 } 126 bd.didRead = true 127 } 128 return bd.buf.Read(p) 129} 130 131func newBomDiscardingReader(r io.Reader) *bomDiscardingReader { 132 return &bomDiscardingReader{buf: bufio.NewReader(r)} 133} 134 135// channelQuorumError takes a channel and a quorum - which specifies how many 136// messages to receive on that channel before returning. It either returns the 137// first non-nil error received on the channel or nil if up to `quorum` nil 138// errors are received 139func channelQuorumError(ch <-chan error, quorum int) (err error) { 140 for i := 0; i < quorum; i++ { 141 if err = <-ch; err != nil { 142 return 143 } 144 } 145 return 146} 147 148// constructUpsertDocument constructs a BSON document to use for upserts 149func constructUpsertDocument(upsertFields []string, document bson.D) bson.D { 150 upsertDocument := bson.D{} 151 var hasDocumentKey bool 152 for _, key := range upsertFields { 153 val := getUpsertValue(key, document) 154 if val != nil { 155 hasDocumentKey = true 156 } 157 upsertDocument = append(upsertDocument, bson.E{Key: key, Value: val}) 158 } 159 if !hasDocumentKey { 160 return nil 161 } 162 return upsertDocument 163} 164 165// doSequentialStreaming takes a slice of workers, a readDocs (input) channel and 166// an outputChan (output) channel. It sequentially writes unprocessed data read from 167// the input channel to each worker and then sequentially reads the processed data 168// from each worker before passing it on to the output channel 169func doSequentialStreaming(workers []*importWorker, readDocs chan Converter, outputChan chan bson.D) { 170 numWorkers := len(workers) 171 172 // feed in the data to be processed and do round-robin 173 // reads from each worker once processing is completed 174 go func() { 175 i := 0 176 for doc := range readDocs { 177 workers[i].unprocessedDataChan <- doc 178 i = (i + 1) % numWorkers 179 } 180 181 // close the read channels of all the workers 182 for i := 0; i < numWorkers; i++ { 183 close(workers[i].unprocessedDataChan) 184 } 185 }() 186 187 // coordinate the order in which the documents are sent over to the 188 // main output channel 189 numDoneWorkers := 0 190 i := 0 191 for { 192 processedDocument, open := <-workers[i].processedDocumentChan 193 if open { 194 outputChan <- processedDocument 195 } else { 196 numDoneWorkers++ 197 } 198 if numDoneWorkers == numWorkers { 199 break 200 } 201 i = (i + 1) % numWorkers 202 } 203} 204 205// getUpsertValue takes a given BSON document and a given field, and returns the 206// field's associated value in the document. The field is specified using dot 207// notation for nested fields. e.g. "person.age" would return 34 would return 208// 34 in the document: bson.M{"person": bson.M{"age": 34}} whereas, 209// "person.name" would return nil 210func getUpsertValue(field string, document bson.D) interface{} { 211 index := strings.Index(field, ".") 212 if index == -1 { 213 // grab the value (ignoring errors because we are okay with nil) 214 val, _ := bsonutil.FindValueByKey(field, &document) 215 return val 216 } 217 // recurse into subdocuments 218 left := field[0:index] 219 subDoc, _ := bsonutil.FindValueByKey(left, &document) 220 if subDoc == nil { 221 log.Logvf(log.DebugHigh, "no subdoc found for '%v'", left) 222 return nil 223 } 224 switch subDoc.(type) { 225 case bson.D: 226 subDocD := subDoc.(bson.D) 227 return getUpsertValue(field[index+1:], subDocD) 228 case *bson.D: 229 subDocD := subDoc.(*bson.D) 230 return getUpsertValue(field[index+1:], *subDocD) 231 default: 232 log.Logvf(log.DebugHigh, "subdoc found for '%v', but couldn't coerce to bson.D", left) 233 return nil 234 } 235} 236 237// removeBlankFields takes document and returns a new copy in which 238// fields with empty/blank values are removed 239func removeBlankFields(document bson.D) (newDocument bson.D) { 240 for _, keyVal := range document { 241 if val, ok := keyVal.Value.(*bson.D); ok { 242 keyVal.Value = removeBlankFields(*val) 243 } 244 if val, ok := keyVal.Value.(string); ok && val == "" { 245 continue 246 } 247 if val, ok := keyVal.Value.(bson.D); ok && val == nil { 248 continue 249 } 250 newDocument = append(newDocument, keyVal) 251 } 252 return newDocument 253} 254 255// setNestedValue takes a nested field - in the form "a.b.c" - 256// its associated value, and a document. It then assigns that 257// value to the appropriate nested field within the document 258func setNestedValue(key string, value interface{}, document *bson.D) { 259 index := strings.Index(key, ".") 260 if index == -1 { 261 *document = append(*document, bson.E{Key: key, Value: value}) 262 return 263 } 264 keyName := key[0:index] 265 subDocument := &bson.D{} 266 elem, err := bsonutil.FindValueByKey(keyName, document) 267 if err != nil { // no such key in the document 268 elem = nil 269 } 270 var existingKey bool 271 if elem != nil { 272 subDocument = elem.(*bson.D) 273 existingKey = true 274 } 275 setNestedValue(key[index+1:], value, subDocument) 276 if !existingKey { 277 *document = append(*document, bson.E{Key: keyName, Value: subDocument}) 278 } 279} 280 281// streamDocuments concurrently processes data gotten from the inputChan 282// channel in parallel and then sends over the processed data to the outputChan 283// channel - either in sequence or concurrently (depending on the value of 284// ordered) - in which the data was received 285func streamDocuments(ordered bool, numDecoders int, readDocs chan Converter, outputChan chan bson.D) (retErr error) { 286 if numDecoders == 0 { 287 numDecoders = 1 288 } 289 var importWorkers []*importWorker 290 wg := new(sync.WaitGroup) 291 importTomb := new(tomb.Tomb) 292 inChan := readDocs 293 outChan := outputChan 294 for i := 0; i < numDecoders; i++ { 295 if ordered { 296 inChan = make(chan Converter, workerBufferSize) 297 outChan = make(chan bson.D, workerBufferSize) 298 } 299 iw := &importWorker{ 300 unprocessedDataChan: inChan, 301 processedDocumentChan: outChan, 302 tomb: importTomb, 303 } 304 importWorkers = append(importWorkers, iw) 305 wg.Add(1) 306 go func(iw importWorker) { 307 defer wg.Done() 308 // only set the first worker error and cause sibling goroutines 309 // to terminate immediately 310 err := iw.processDocuments(ordered) 311 if err != nil && retErr == nil { 312 retErr = err 313 iw.tomb.Kill(err) 314 } 315 }(*iw) 316 } 317 318 // if ordered, we have to coordinate the sequence in which processed 319 // documents are passed to the main read channel 320 if ordered { 321 doSequentialStreaming(importWorkers, readDocs, outputChan) 322 } 323 wg.Wait() 324 close(outputChan) 325 return 326} 327 328// coercionError should only be used as a specific error type to check 329// whether tokensToBSON wants the row to print 330type coercionError struct{} 331 332func (coercionError) Error() string { return "coercionError" } 333 334// tokensToBSON reads in slice of records - along with ordered column names - 335// and returns a BSON document for the record. 336func tokensToBSON(colSpecs []ColumnSpec, tokens []string, numProcessed uint64, ignoreBlanks bool) (bson.D, error) { 337 log.Logvf(log.DebugHigh, "got line: %v", tokens) 338 var parsedValue interface{} 339 document := bson.D{} 340 for index, token := range tokens { 341 if token == "" && ignoreBlanks { 342 continue 343 } 344 if index < len(colSpecs) { 345 parsedValue, err := colSpecs[index].Parser.Parse(token) 346 if err != nil { 347 log.Logvf(log.DebugHigh, "parse failure in document #%d for column '%s',"+ 348 "could not parse token '%s' to type %s", 349 numProcessed, colSpecs[index].Name, token, colSpecs[index].TypeName) 350 switch colSpecs[index].ParseGrace { 351 case pgAutoCast: 352 parsedValue = autoParse(token) 353 case pgSkipField: 354 continue 355 case pgSkipRow: 356 log.Logvf(log.Always, "skipping row #%d: %v", numProcessed, tokens) 357 return nil, coercionError{} 358 case pgStop: 359 return nil, fmt.Errorf("type coercion failure in document #%d for column '%s', "+ 360 "could not parse token '%s' to type %s", 361 numProcessed, colSpecs[index].Name, token, colSpecs[index].TypeName) 362 } 363 } 364 if strings.Index(colSpecs[index].Name, ".") != -1 { 365 setNestedValue(colSpecs[index].Name, parsedValue, &document) 366 } else { 367 document = append(document, bson.E{Key: colSpecs[index].Name, Value: parsedValue}) 368 } 369 } else { 370 parsedValue = autoParse(token) 371 key := "field" + strconv.Itoa(index) 372 if util.StringSliceContains(ColumnNames(colSpecs), key) { 373 return nil, fmt.Errorf("duplicate field name - on %v - for token #%v ('%v') in document #%v", 374 key, index+1, parsedValue, numProcessed) 375 } 376 document = append(document, bson.E{Key: key, Value: parsedValue}) 377 } 378 } 379 return document, nil 380} 381 382// validateFields takes a slice of fields and returns an error if the fields 383// are invalid, returns nil otherwise 384func validateFields(fields []string) error { 385 fieldsCopy := make([]string, len(fields), len(fields)) 386 copy(fieldsCopy, fields) 387 sort.Sort(sort.StringSlice(fieldsCopy)) 388 389 for index, field := range fieldsCopy { 390 if strings.HasSuffix(field, ".") { 391 return fmt.Errorf("field '%v' cannot end with a '.'", field) 392 } 393 if strings.HasPrefix(field, ".") { 394 return fmt.Errorf("field '%v' cannot start with a '.'", field) 395 } 396 if strings.HasPrefix(field, "$") { 397 return fmt.Errorf("field '%v' cannot start with a '$'", field) 398 } 399 if strings.Contains(field, "..") { 400 return fmt.Errorf("field '%v' cannot contain consecutive '.' characters", field) 401 } 402 // NOTE: since fields is sorted, this check ensures that no field 403 // is incompatible with another one that occurs further down the list. 404 // meant to prevent cases where we have fields like "a" and "a.c" 405 for _, latterField := range fieldsCopy[index+1:] { 406 // NOTE: this means we will not support imports that have fields that 407 // include e.g. a, a.b 408 if strings.HasPrefix(latterField, field+".") { 409 return fmt.Errorf("fields '%v' and '%v' are incompatible", field, latterField) 410 } 411 // NOTE: this means we will not support imports that have fields like 412 // a, a - since this is invalid in MongoDB 413 if field == latterField { 414 return fmt.Errorf("fields cannot be identical: '%v' and '%v'", field, latterField) 415 } 416 } 417 } 418 return nil 419} 420 421// validateReaderFields is a helper to validate fields for input readers 422func validateReaderFields(fields []string) error { 423 if err := validateFields(fields); err != nil { 424 return err 425 } 426 if len(fields) == 1 { 427 log.Logvf(log.Info, "using field: %v", fields[0]) 428 } else { 429 log.Logvf(log.Info, "using fields: %v", strings.Join(fields, ",")) 430 } 431 return nil 432} 433 434// processDocuments reads from the Converter channel and for each record, converts it 435// to a bson.D document before sending it on the processedDocumentChan channel. Once the 436// input channel is closed the processed channel is also closed if the worker streams its 437// reads in order 438func (iw *importWorker) processDocuments(ordered bool) error { 439 if ordered { 440 defer close(iw.processedDocumentChan) 441 } 442 for { 443 select { 444 case converter, alive := <-iw.unprocessedDataChan: 445 if !alive { 446 return nil 447 } 448 document, err := converter.Convert() 449 if err != nil { 450 return err 451 } 452 if document == nil { 453 continue 454 } 455 iw.processedDocumentChan <- document 456 case <-iw.tomb.Dying(): 457 return nil 458 } 459 } 460} 461