1// Copyright (C) MongoDB, Inc. 2014-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongoimport 8 9import ( 10 "errors" 11 "fmt" 12 "io" 13 "strings" 14 15 "github.com/mongodb/mongo-tools-common/bsonutil" 16 "github.com/mongodb/mongo-tools-common/json" 17 "github.com/mongodb/mongo-tools-common/log" 18 "go.mongodb.org/mongo-driver/bson" 19) 20 21// JSONInputReader is an implementation of InputReader that reads documents 22// in JSON format. 23type JSONInputReader struct { 24 // isArray indicates if the JSON import is an array of JSON documents 25 // or not 26 isArray bool 27 28 // decoder is used to read the next valid JSON documents from the input source 29 decoder *json.Decoder 30 31 // numProcessed indicates the number of JSON documents processed 32 numProcessed uint64 33 34 // readOpeningBracket indicates if the underlying io.Reader has consumed 35 // an opening bracket from the input source. Used to prevent errors when 36 // a JSON input source contains just '[]' 37 readOpeningBracket bool 38 39 // expectedByte is used to store the next expected valid character for JSON 40 // array imports 41 expectedByte byte 42 43 // bytesFromReader is used to store the next byte read from the Reader for 44 // JSON array imports 45 bytesFromReader []byte 46 47 // separatorReader is used for JSON arrays to look for a valid array 48 // separator. It is a reader consisting of the decoder's buffer and the 49 // underlying reader 50 separatorReader io.Reader 51 52 // embedded sizeTracker exposes the Size() method to check the number of bytes read so far 53 sizeTracker 54 55 // numDecoders is the number of concurrent goroutines to use for decoding 56 numDecoders int 57 58 // legacyExtJSON specifies whether or not the legacy extended JSON format should be used. 59 legacyExtJSON bool 60} 61 62// JSONConverter implements the Converter interface for JSON input. 63type JSONConverter struct { 64 data []byte 65 index uint64 66 legacyExtJSON bool 67} 68 69var ( 70 // ErrNoOpeningBracket means that the input source did not contain any 71 // opening brace - returned only if --jsonArray is passed in. 72 ErrNoOpeningBracket = errors.New("bad JSON array format - found no " + 73 "opening bracket '[' in input source") 74 75 // ErrNoClosingBracket means that the input source did not contain any 76 // closing brace - returned only if --jsonArray is passed in. 77 ErrNoClosingBracket = errors.New("bad JSON array format - found no " + 78 "closing bracket ']' in input source") 79) 80 81// NewJSONInputReader creates a new JSONInputReader in array mode if specified, 82// configured to read data to the given io.Reader. 83func NewJSONInputReader(isArray bool, legacyExtJSON bool, in io.Reader, numDecoders int) *JSONInputReader { 84 szCount := newSizeTrackingReader(newBomDiscardingReader(in)) 85 return &JSONInputReader{ 86 isArray: isArray, 87 sizeTracker: szCount, 88 decoder: json.NewDecoder(szCount), 89 readOpeningBracket: false, 90 bytesFromReader: make([]byte, 1), 91 numDecoders: numDecoders, 92 legacyExtJSON: legacyExtJSON, 93 } 94} 95 96// ReadAndValidateHeader is a no-op for JSON imports; always returns nil. 97func (r *JSONInputReader) ReadAndValidateHeader() error { 98 return nil 99} 100 101// ReadAndValidateTypedHeader is a no-op for JSON imports; always returns nil. 102func (r *JSONInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) error { 103 return nil 104} 105 106// StreamDocument takes a boolean indicating if the documents should be streamed 107// in read order and a channel on which to stream the documents processed from 108// the underlying reader. Returns a non-nil error if encountered 109func (r *JSONInputReader) StreamDocument(ordered bool, readChan chan bson.D) (retErr error) { 110 rawChan := make(chan Converter, r.numDecoders) 111 jsonErrChan := make(chan error) 112 113 // begin reading from source 114 go func() { 115 var err error 116 for { 117 if r.isArray { 118 if err = r.readJSONArraySeparator(); err != nil { 119 close(rawChan) 120 if err == io.EOF { 121 jsonErrChan <- nil 122 } else { 123 r.numProcessed++ 124 jsonErrChan <- fmt.Errorf("error reading separator after document #%v: %v", r.numProcessed, err) 125 } 126 return 127 } 128 } 129 rawBytes, err := r.decoder.ScanObject() 130 if err != nil { 131 close(rawChan) 132 if err == io.EOF { 133 jsonErrChan <- nil 134 } else { 135 r.numProcessed++ 136 jsonErrChan <- fmt.Errorf("error processing document #%v: %v", r.numProcessed, err) 137 } 138 return 139 } 140 rawChan <- JSONConverter{ 141 data: rawBytes, 142 index: r.numProcessed, 143 legacyExtJSON: r.legacyExtJSON, 144 } 145 r.numProcessed++ 146 } 147 }() 148 149 // begin processing read bytes 150 go func() { 151 jsonErrChan <- streamDocuments(ordered, r.numDecoders, rawChan, readChan) 152 }() 153 154 return channelQuorumError(jsonErrChan, 2) 155} 156 157// Convert implements the Converter interface for JSON input. It converts a 158// JSONConverter struct to a BSON document. 159func (c JSONConverter) Convert() (bson.D, error) { 160 if c.legacyExtJSON { 161 return c.convertLegacyExtJSON() 162 } 163 164 var doc bson.D 165 if err := bson.UnmarshalExtJSON(c.data, false, &doc); err != nil { 166 return nil, err 167 } 168 169 return doc, nil 170} 171 172func (c JSONConverter) convertLegacyExtJSON() (bson.D, error) { 173 document, err := json.UnmarshalBsonD(c.data) 174 if err != nil { 175 return nil, fmt.Errorf("error unmarshaling bytes on document #%v: %v", c.index, err) 176 } 177 log.Logvf(log.DebugHigh, "got line: %v", document) 178 179 bsonD, err := bsonutil.GetExtendedBsonD(document) 180 if err != nil { 181 return nil, fmt.Errorf("error getting extended BSON for document #%v: %v", c.index, err) 182 } 183 log.Logvf(log.DebugHigh, "got extended line: %#v", bsonD) 184 return bsonD, nil 185} 186 187// readJSONArraySeparator is a helper method used to process JSON arrays. It is 188// used to read any of the valid separators for a JSON array and flag invalid 189// characters. 190// 191// It will read a byte at a time until it finds an expected character after 192// which it returns control to the caller. 193// 194// It will also return immediately if it finds any error (including EOF). If it 195// reads a JSON_ARRAY_END byte, as a validity check it will continue to scan the 196// input source until it hits an error (including EOF) to ensure the entire 197// input source content is a valid JSON array 198func (r *JSONInputReader) readJSONArraySeparator() error { 199 r.expectedByte = json.ArraySep 200 if r.numProcessed == 0 { 201 r.expectedByte = json.ArrayStart 202 } 203 204 var readByte byte 205 scanp := 0 206 207 separatorReader := io.MultiReader( 208 r.decoder.Buffered(), 209 r.decoder.R, 210 ) 211 for readByte != r.expectedByte { 212 n, err := separatorReader.Read(r.bytesFromReader) 213 scanp += n 214 if n == 0 || err != nil { 215 if err == io.EOF { 216 return ErrNoClosingBracket 217 } 218 return err 219 } 220 readByte = r.bytesFromReader[0] 221 222 if readByte == json.ArrayEnd { 223 // if we read the end of the JSON array, ensure we have no other 224 // non-whitespace characters at the end of the array 225 for { 226 _, err = separatorReader.Read(r.bytesFromReader) 227 if err != nil { 228 // takes care of the '[]' case 229 if !r.readOpeningBracket { 230 return ErrNoOpeningBracket 231 } 232 return err 233 } 234 readString := string(r.bytesFromReader[0]) 235 if strings.TrimSpace(readString) != "" { 236 return fmt.Errorf("bad JSON array format - found '%v' "+ 237 "after '%v' in input source", readString, 238 string(json.ArrayEnd)) 239 } 240 } 241 } 242 243 // this will catch any invalid inter JSON object byte that occurs in the 244 // input source 245 if !(readByte == json.ArraySep || 246 strings.TrimSpace(string(readByte)) == "" || 247 readByte == json.ArrayStart || 248 readByte == json.ArrayEnd) { 249 if r.expectedByte == json.ArrayStart { 250 return ErrNoOpeningBracket 251 } 252 return fmt.Errorf("bad JSON array format - found '%v' outside "+ 253 "JSON object/array in input source", string(readByte)) 254 } 255 } 256 // adjust the buffer to account for read bytes 257 if scanp < len(r.decoder.Buf) { 258 r.decoder.Buf = r.decoder.Buf[scanp:] 259 } else { 260 r.decoder.Buf = []byte{} 261 } 262 r.readOpeningBracket = true 263 return nil 264} 265