1// Copyright (C) MongoDB, Inc. 2014-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongoimport
8
9import (
10	"errors"
11	"fmt"
12	"io"
13	"strings"
14
15	"github.com/mongodb/mongo-tools-common/bsonutil"
16	"github.com/mongodb/mongo-tools-common/json"
17	"github.com/mongodb/mongo-tools-common/log"
18	"go.mongodb.org/mongo-driver/bson"
19)
20
21// JSONInputReader is an implementation of InputReader that reads documents
22// in JSON format.
23type JSONInputReader struct {
24	// isArray indicates if the JSON import is an array of JSON documents
25	// or not
26	isArray bool
27
28	// decoder is used to read the 	next valid JSON documents from the input source
29	decoder *json.Decoder
30
31	// numProcessed indicates the number of JSON documents processed
32	numProcessed uint64
33
34	// readOpeningBracket indicates if the underlying io.Reader has consumed
35	// an opening bracket from the input source. Used to prevent errors when
36	// a JSON input source contains just '[]'
37	readOpeningBracket bool
38
39	// expectedByte is used to store the next expected valid character for JSON
40	// array imports
41	expectedByte byte
42
43	// bytesFromReader is used to store the next byte read from the Reader for
44	// JSON array imports
45	bytesFromReader []byte
46
47	// separatorReader is used for JSON arrays to look for a valid array
48	// separator. It is a reader consisting of the decoder's buffer and the
49	// underlying reader
50	separatorReader io.Reader
51
52	// embedded sizeTracker exposes the Size() method to check the number of bytes read so far
53	sizeTracker
54
55	// numDecoders is the number of concurrent goroutines to use for decoding
56	numDecoders int
57
58	// legacyExtJSON specifies whether or not the legacy extended JSON format should be used.
59	legacyExtJSON bool
60}
61
62// JSONConverter implements the Converter interface for JSON input.
63type JSONConverter struct {
64	data          []byte
65	index         uint64
66	legacyExtJSON bool
67}
68
69var (
70	// ErrNoOpeningBracket means that the input source did not contain any
71	// opening brace - returned only if --jsonArray is passed in.
72	ErrNoOpeningBracket = errors.New("bad JSON array format - found no " +
73		"opening bracket '[' in input source")
74
75	// ErrNoClosingBracket means that the input source did not contain any
76	// closing brace - returned only if --jsonArray is passed in.
77	ErrNoClosingBracket = errors.New("bad JSON array format - found no " +
78		"closing bracket ']' in input source")
79)
80
81// NewJSONInputReader creates a new JSONInputReader in array mode if specified,
82// configured to read data to the given io.Reader.
83func NewJSONInputReader(isArray bool, legacyExtJSON bool, in io.Reader, numDecoders int) *JSONInputReader {
84	szCount := newSizeTrackingReader(newBomDiscardingReader(in))
85	return &JSONInputReader{
86		isArray:            isArray,
87		sizeTracker:        szCount,
88		decoder:            json.NewDecoder(szCount),
89		readOpeningBracket: false,
90		bytesFromReader:    make([]byte, 1),
91		numDecoders:        numDecoders,
92		legacyExtJSON:      legacyExtJSON,
93	}
94}
95
96// ReadAndValidateHeader is a no-op for JSON imports; always returns nil.
97func (r *JSONInputReader) ReadAndValidateHeader() error {
98	return nil
99}
100
101// ReadAndValidateTypedHeader is a no-op for JSON imports; always returns nil.
102func (r *JSONInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) error {
103	return nil
104}
105
106// StreamDocument takes a boolean indicating if the documents should be streamed
107// in read order and a channel on which to stream the documents processed from
108// the underlying reader. Returns a non-nil error if encountered
109func (r *JSONInputReader) StreamDocument(ordered bool, readChan chan bson.D) (retErr error) {
110	rawChan := make(chan Converter, r.numDecoders)
111	jsonErrChan := make(chan error)
112
113	// begin reading from source
114	go func() {
115		var err error
116		for {
117			if r.isArray {
118				if err = r.readJSONArraySeparator(); err != nil {
119					close(rawChan)
120					if err == io.EOF {
121						jsonErrChan <- nil
122					} else {
123						r.numProcessed++
124						jsonErrChan <- fmt.Errorf("error reading separator after document #%v: %v", r.numProcessed, err)
125					}
126					return
127				}
128			}
129			rawBytes, err := r.decoder.ScanObject()
130			if err != nil {
131				close(rawChan)
132				if err == io.EOF {
133					jsonErrChan <- nil
134				} else {
135					r.numProcessed++
136					jsonErrChan <- fmt.Errorf("error processing document #%v: %v", r.numProcessed, err)
137				}
138				return
139			}
140			rawChan <- JSONConverter{
141				data:          rawBytes,
142				index:         r.numProcessed,
143				legacyExtJSON: r.legacyExtJSON,
144			}
145			r.numProcessed++
146		}
147	}()
148
149	// begin processing read bytes
150	go func() {
151		jsonErrChan <- streamDocuments(ordered, r.numDecoders, rawChan, readChan)
152	}()
153
154	return channelQuorumError(jsonErrChan, 2)
155}
156
157// Convert implements the Converter interface for JSON input. It converts a
158// JSONConverter struct to a BSON document.
159func (c JSONConverter) Convert() (bson.D, error) {
160	if c.legacyExtJSON {
161		return c.convertLegacyExtJSON()
162	}
163
164	var doc bson.D
165	if err := bson.UnmarshalExtJSON(c.data, false, &doc); err != nil {
166		return nil, err
167	}
168
169	return doc, nil
170}
171
172func (c JSONConverter) convertLegacyExtJSON() (bson.D, error) {
173	document, err := json.UnmarshalBsonD(c.data)
174	if err != nil {
175		return nil, fmt.Errorf("error unmarshaling bytes on document #%v: %v", c.index, err)
176	}
177	log.Logvf(log.DebugHigh, "got line: %v", document)
178
179	bsonD, err := bsonutil.GetExtendedBsonD(document)
180	if err != nil {
181		return nil, fmt.Errorf("error getting extended BSON for document #%v: %v", c.index, err)
182	}
183	log.Logvf(log.DebugHigh, "got extended line: %#v", bsonD)
184	return bsonD, nil
185}
186
187// readJSONArraySeparator is a helper method used to process JSON arrays. It is
188// used to read any of the valid separators for a JSON array and flag invalid
189// characters.
190//
191// It will read a byte at a time until it finds an expected character after
192// which it returns control to the caller.
193//
194// It will also return immediately if it finds any error (including EOF). If it
195// reads a JSON_ARRAY_END byte, as a validity check it will continue to scan the
196// input source until it hits an error (including EOF) to ensure the entire
197// input source content is a valid JSON array
198func (r *JSONInputReader) readJSONArraySeparator() error {
199	r.expectedByte = json.ArraySep
200	if r.numProcessed == 0 {
201		r.expectedByte = json.ArrayStart
202	}
203
204	var readByte byte
205	scanp := 0
206
207	separatorReader := io.MultiReader(
208		r.decoder.Buffered(),
209		r.decoder.R,
210	)
211	for readByte != r.expectedByte {
212		n, err := separatorReader.Read(r.bytesFromReader)
213		scanp += n
214		if n == 0 || err != nil {
215			if err == io.EOF {
216				return ErrNoClosingBracket
217			}
218			return err
219		}
220		readByte = r.bytesFromReader[0]
221
222		if readByte == json.ArrayEnd {
223			// if we read the end of the JSON array, ensure we have no other
224			// non-whitespace characters at the end of the array
225			for {
226				_, err = separatorReader.Read(r.bytesFromReader)
227				if err != nil {
228					// takes care of the '[]' case
229					if !r.readOpeningBracket {
230						return ErrNoOpeningBracket
231					}
232					return err
233				}
234				readString := string(r.bytesFromReader[0])
235				if strings.TrimSpace(readString) != "" {
236					return fmt.Errorf("bad JSON array format - found '%v' "+
237						"after '%v' in input source", readString,
238						string(json.ArrayEnd))
239				}
240			}
241		}
242
243		// this will catch any invalid inter JSON object byte that occurs in the
244		// input source
245		if !(readByte == json.ArraySep ||
246			strings.TrimSpace(string(readByte)) == "" ||
247			readByte == json.ArrayStart ||
248			readByte == json.ArrayEnd) {
249			if r.expectedByte == json.ArrayStart {
250				return ErrNoOpeningBracket
251			}
252			return fmt.Errorf("bad JSON array format - found '%v' outside "+
253				"JSON object/array in input source", string(readByte))
254		}
255	}
256	// adjust the buffer to account for read bytes
257	if scanp < len(r.decoder.Buf) {
258		r.decoder.Buf = r.decoder.Buf[scanp:]
259	} else {
260		r.decoder.Buf = []byte{}
261	}
262	r.readOpeningBracket = true
263	return nil
264}
265