1// Copyright (C) MongoDB, Inc. 2014-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongoimport
8
9import (
10	"bufio"
11	"bytes"
12	"fmt"
13	"io"
14	"sort"
15	"strconv"
16	"strings"
17	"sync"
18	"sync/atomic"
19
20	"github.com/mongodb/mongo-tools-common/bsonutil"
21	"github.com/mongodb/mongo-tools-common/log"
22	"github.com/mongodb/mongo-tools-common/util"
23	"go.mongodb.org/mongo-driver/bson"
24	"gopkg.in/tomb.v2"
25)
26
27type ParseGrace int
28
29const (
30	pgAutoCast ParseGrace = iota
31	pgSkipField
32	pgSkipRow
33	pgStop
34)
35
36// ValidatePG ensures the user-provided parseGrace is one of the allowed
37// values.
38func ValidatePG(pg string) (ParseGrace, error) {
39	switch pg {
40	case "autoCast":
41		return pgAutoCast, nil
42	case "skipField":
43		return pgSkipField, nil
44	case "skipRow":
45		return pgSkipRow, nil
46	case "stop":
47		return pgStop, nil
48	default:
49		return pgAutoCast, fmt.Errorf("invalid parse grace: %s", pg)
50	}
51}
52
53// ParsePG interprets the user-provided parseGrace, assuming it is valid.
54func ParsePG(pg string) (res ParseGrace) {
55	res, _ = ValidatePG(pg)
56	return
57}
58
59// Converter is an interface that adds the basic Convert method which returns a
60// valid BSON document that has been converted by the underlying implementation.
61// If conversion fails, err will be set.
62type Converter interface {
63	Convert() (document bson.D, err error)
64}
65
66// An importWorker reads Converter from the unprocessedDataChan channel and
67// sends processed BSON documents on the processedDocumentChan channel
68type importWorker struct {
69	// unprocessedDataChan is used to stream the input data for a worker to process
70	unprocessedDataChan chan Converter
71
72	// used to stream the processed document back to the caller
73	processedDocumentChan chan bson.D
74
75	// used to synchronise all worker goroutines
76	tomb *tomb.Tomb
77}
78
79// an interface for tracking the number of bytes, which is used in mongoimport to feed
80// the progress bar.
81type sizeTracker interface {
82	Size() int64
83}
84
85// sizeTrackingReader implements Reader and sizeTracker by wrapping an io.Reader and keeping track
86// of the total number of bytes read from each call to Read().
87type sizeTrackingReader struct {
88	bytesRead int64
89	reader    io.Reader
90}
91
92func (str *sizeTrackingReader) Size() int64 {
93	bytes := atomic.LoadInt64(&str.bytesRead)
94	return bytes
95}
96
97func (str *sizeTrackingReader) Read(p []byte) (n int, err error) {
98	n, err = str.reader.Read(p)
99	atomic.AddInt64(&str.bytesRead, int64(n))
100	return
101}
102
103func newSizeTrackingReader(reader io.Reader) *sizeTrackingReader {
104	return &sizeTrackingReader{
105		reader:    reader,
106		bytesRead: 0,
107	}
108}
109
110var (
111	UTF8_BOM = []byte{0xEF, 0xBB, 0xBF}
112)
113
114// bomDiscardingReader implements and wraps io.Reader, discarding the UTF-8 BOM, if applicable
115type bomDiscardingReader struct {
116	buf     *bufio.Reader
117	didRead bool
118}
119
120func (bd *bomDiscardingReader) Read(p []byte) (int, error) {
121	if !bd.didRead {
122		bom, err := bd.buf.Peek(3)
123		if err == nil && bytes.Equal(bom, UTF8_BOM) {
124			bd.buf.Read(make([]byte, 3)) // discard BOM
125		}
126		bd.didRead = true
127	}
128	return bd.buf.Read(p)
129}
130
131func newBomDiscardingReader(r io.Reader) *bomDiscardingReader {
132	return &bomDiscardingReader{buf: bufio.NewReader(r)}
133}
134
135// channelQuorumError takes a channel and a quorum - which specifies how many
136// messages to receive on that channel before returning. It either returns the
137// first non-nil error received on the channel or nil if up to `quorum` nil
138// errors are received
139func channelQuorumError(ch <-chan error, quorum int) (err error) {
140	for i := 0; i < quorum; i++ {
141		if err = <-ch; err != nil {
142			return
143		}
144	}
145	return
146}
147
148// constructUpsertDocument constructs a BSON document to use for upserts
149func constructUpsertDocument(upsertFields []string, document bson.D) bson.D {
150	upsertDocument := bson.D{}
151	var hasDocumentKey bool
152	for _, key := range upsertFields {
153		val := getUpsertValue(key, document)
154		if val != nil {
155			hasDocumentKey = true
156		}
157		upsertDocument = append(upsertDocument, bson.E{Key: key, Value: val})
158	}
159	if !hasDocumentKey {
160		return nil
161	}
162	return upsertDocument
163}
164
165// doSequentialStreaming takes a slice of workers, a readDocs (input) channel and
166// an outputChan (output) channel. It sequentially writes unprocessed data read from
167// the input channel to each worker and then sequentially reads the processed data
168// from each worker before passing it on to the output channel
169func doSequentialStreaming(workers []*importWorker, readDocs chan Converter, outputChan chan bson.D) {
170	numWorkers := len(workers)
171
172	// feed in the data to be processed and do round-robin
173	// reads from each worker once processing is completed
174	go func() {
175		i := 0
176		for doc := range readDocs {
177			workers[i].unprocessedDataChan <- doc
178			i = (i + 1) % numWorkers
179		}
180
181		// close the read channels of all the workers
182		for i := 0; i < numWorkers; i++ {
183			close(workers[i].unprocessedDataChan)
184		}
185	}()
186
187	// coordinate the order in which the documents are sent over to the
188	// main output channel
189	numDoneWorkers := 0
190	i := 0
191	for {
192		processedDocument, open := <-workers[i].processedDocumentChan
193		if open {
194			outputChan <- processedDocument
195		} else {
196			numDoneWorkers++
197		}
198		if numDoneWorkers == numWorkers {
199			break
200		}
201		i = (i + 1) % numWorkers
202	}
203}
204
205// getUpsertValue takes a given BSON document and a given field, and returns the
206// field's associated value in the document. The field is specified using dot
207// notation for nested fields. e.g. "person.age" would return 34 would return
208// 34 in the document: bson.M{"person": bson.M{"age": 34}} whereas,
209// "person.name" would return nil
210func getUpsertValue(field string, document bson.D) interface{} {
211	index := strings.Index(field, ".")
212	if index == -1 {
213		// grab the value (ignoring errors because we are okay with nil)
214		val, _ := bsonutil.FindValueByKey(field, &document)
215		return val
216	}
217	// recurse into subdocuments
218	left := field[0:index]
219	subDoc, _ := bsonutil.FindValueByKey(left, &document)
220	if subDoc == nil {
221		log.Logvf(log.DebugHigh, "no subdoc found for '%v'", left)
222		return nil
223	}
224	switch subDoc.(type) {
225	case bson.D:
226		subDocD := subDoc.(bson.D)
227		return getUpsertValue(field[index+1:], subDocD)
228	case *bson.D:
229		subDocD := subDoc.(*bson.D)
230		return getUpsertValue(field[index+1:], *subDocD)
231	default:
232		log.Logvf(log.DebugHigh, "subdoc found for '%v', but couldn't coerce to bson.D", left)
233		return nil
234	}
235}
236
237// removeBlankFields takes document and returns a new copy in which
238// fields with empty/blank values are removed
239func removeBlankFields(document bson.D) (newDocument bson.D) {
240	for _, keyVal := range document {
241		if val, ok := keyVal.Value.(*bson.D); ok {
242			keyVal.Value = removeBlankFields(*val)
243		}
244		if val, ok := keyVal.Value.(string); ok && val == "" {
245			continue
246		}
247		if val, ok := keyVal.Value.(bson.D); ok && val == nil {
248			continue
249		}
250		newDocument = append(newDocument, keyVal)
251	}
252	return newDocument
253}
254
255// setNestedValue takes a nested field - in the form "a.b.c" -
256// its associated value, and a document. It then assigns that
257// value to the appropriate nested field within the document
258func setNestedValue(key string, value interface{}, document *bson.D) {
259	index := strings.Index(key, ".")
260	if index == -1 {
261		*document = append(*document, bson.E{Key: key, Value: value})
262		return
263	}
264	keyName := key[0:index]
265	subDocument := &bson.D{}
266	elem, err := bsonutil.FindValueByKey(keyName, document)
267	if err != nil { // no such key in the document
268		elem = nil
269	}
270	var existingKey bool
271	if elem != nil {
272		subDocument = elem.(*bson.D)
273		existingKey = true
274	}
275	setNestedValue(key[index+1:], value, subDocument)
276	if !existingKey {
277		*document = append(*document, bson.E{Key: keyName, Value: subDocument})
278	}
279}
280
281// streamDocuments concurrently processes data gotten from the inputChan
282// channel in parallel and then sends over the processed data to the outputChan
283// channel - either in sequence or concurrently (depending on the value of
284// ordered) - in which the data was received
285func streamDocuments(ordered bool, numDecoders int, readDocs chan Converter, outputChan chan bson.D) (retErr error) {
286	if numDecoders == 0 {
287		numDecoders = 1
288	}
289	var importWorkers []*importWorker
290	wg := new(sync.WaitGroup)
291	importTomb := new(tomb.Tomb)
292	inChan := readDocs
293	outChan := outputChan
294	for i := 0; i < numDecoders; i++ {
295		if ordered {
296			inChan = make(chan Converter, workerBufferSize)
297			outChan = make(chan bson.D, workerBufferSize)
298		}
299		iw := &importWorker{
300			unprocessedDataChan:   inChan,
301			processedDocumentChan: outChan,
302			tomb:                  importTomb,
303		}
304		importWorkers = append(importWorkers, iw)
305		wg.Add(1)
306		go func(iw importWorker) {
307			defer wg.Done()
308			// only set the first worker error and cause sibling goroutines
309			// to terminate immediately
310			err := iw.processDocuments(ordered)
311			if err != nil && retErr == nil {
312				retErr = err
313				iw.tomb.Kill(err)
314			}
315		}(*iw)
316	}
317
318	// if ordered, we have to coordinate the sequence in which processed
319	// documents are passed to the main read channel
320	if ordered {
321		doSequentialStreaming(importWorkers, readDocs, outputChan)
322	}
323	wg.Wait()
324	close(outputChan)
325	return
326}
327
328// coercionError should only be used as a specific error type to check
329// whether tokensToBSON wants the row to print
330type coercionError struct{}
331
332func (coercionError) Error() string { return "coercionError" }
333
334// tokensToBSON reads in slice of records - along with ordered column names -
335// and returns a BSON document for the record.
336func tokensToBSON(colSpecs []ColumnSpec, tokens []string, numProcessed uint64, ignoreBlanks bool) (bson.D, error) {
337	log.Logvf(log.DebugHigh, "got line: %v", tokens)
338	var parsedValue interface{}
339	document := bson.D{}
340	for index, token := range tokens {
341		if token == "" && ignoreBlanks {
342			continue
343		}
344		if index < len(colSpecs) {
345			parsedValue, err := colSpecs[index].Parser.Parse(token)
346			if err != nil {
347				log.Logvf(log.DebugHigh, "parse failure in document #%d for column '%s',"+
348					"could not parse token '%s' to type %s",
349					numProcessed, colSpecs[index].Name, token, colSpecs[index].TypeName)
350				switch colSpecs[index].ParseGrace {
351				case pgAutoCast:
352					parsedValue = autoParse(token)
353				case pgSkipField:
354					continue
355				case pgSkipRow:
356					log.Logvf(log.Always, "skipping row #%d: %v", numProcessed, tokens)
357					return nil, coercionError{}
358				case pgStop:
359					return nil, fmt.Errorf("type coercion failure in document #%d for column '%s', "+
360						"could not parse token '%s' to type %s",
361						numProcessed, colSpecs[index].Name, token, colSpecs[index].TypeName)
362				}
363			}
364			if strings.Index(colSpecs[index].Name, ".") != -1 {
365				setNestedValue(colSpecs[index].Name, parsedValue, &document)
366			} else {
367				document = append(document, bson.E{Key: colSpecs[index].Name, Value: parsedValue})
368			}
369		} else {
370			parsedValue = autoParse(token)
371			key := "field" + strconv.Itoa(index)
372			if util.StringSliceContains(ColumnNames(colSpecs), key) {
373				return nil, fmt.Errorf("duplicate field name - on %v - for token #%v ('%v') in document #%v",
374					key, index+1, parsedValue, numProcessed)
375			}
376			document = append(document, bson.E{Key: key, Value: parsedValue})
377		}
378	}
379	return document, nil
380}
381
382// validateFields takes a slice of fields and returns an error if the fields
383// are invalid, returns nil otherwise
384func validateFields(fields []string) error {
385	fieldsCopy := make([]string, len(fields), len(fields))
386	copy(fieldsCopy, fields)
387	sort.Sort(sort.StringSlice(fieldsCopy))
388
389	for index, field := range fieldsCopy {
390		if strings.HasSuffix(field, ".") {
391			return fmt.Errorf("field '%v' cannot end with a '.'", field)
392		}
393		if strings.HasPrefix(field, ".") {
394			return fmt.Errorf("field '%v' cannot start with a '.'", field)
395		}
396		if strings.HasPrefix(field, "$") {
397			return fmt.Errorf("field '%v' cannot start with a '$'", field)
398		}
399		if strings.Contains(field, "..") {
400			return fmt.Errorf("field '%v' cannot contain consecutive '.' characters", field)
401		}
402		// NOTE: since fields is sorted, this check ensures that no field
403		// is incompatible with another one that occurs further down the list.
404		// meant to prevent cases where we have fields like "a" and "a.c"
405		for _, latterField := range fieldsCopy[index+1:] {
406			// NOTE: this means we will not support imports that have fields that
407			// include e.g. a, a.b
408			if strings.HasPrefix(latterField, field+".") {
409				return fmt.Errorf("fields '%v' and '%v' are incompatible", field, latterField)
410			}
411			// NOTE: this means we will not support imports that have fields like
412			// a, a - since this is invalid in MongoDB
413			if field == latterField {
414				return fmt.Errorf("fields cannot be identical: '%v' and '%v'", field, latterField)
415			}
416		}
417	}
418	return nil
419}
420
421// validateReaderFields is a helper to validate fields for input readers
422func validateReaderFields(fields []string) error {
423	if err := validateFields(fields); err != nil {
424		return err
425	}
426	if len(fields) == 1 {
427		log.Logvf(log.Info, "using field: %v", fields[0])
428	} else {
429		log.Logvf(log.Info, "using fields: %v", strings.Join(fields, ","))
430	}
431	return nil
432}
433
434// processDocuments reads from the Converter channel and for each record, converts it
435// to a bson.D document before sending it on the processedDocumentChan channel. Once the
436// input channel is closed the processed channel is also closed if the worker streams its
437// reads in order
438func (iw *importWorker) processDocuments(ordered bool) error {
439	if ordered {
440		defer close(iw.processedDocumentChan)
441	}
442	for {
443		select {
444		case converter, alive := <-iw.unprocessedDataChan:
445			if !alive {
446				return nil
447			}
448			document, err := converter.Convert()
449			if err != nil {
450				return err
451			}
452			if document == nil {
453				continue
454			}
455			iw.processedDocumentChan <- document
456		case <-iw.tomb.Dying():
457			return nil
458		}
459	}
460}
461