1// Copyright (C) MongoDB, Inc. 2014-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package db 8 9import ( 10 "fmt" 11 "gopkg.in/mgo.v2/bson" 12 "io" 13) 14 15// BSONSource reads documents from the underlying io.ReadCloser, Stream which 16// wraps a stream of BSON documents. 17type BSONSource struct { 18 reusableBuf []byte 19 Stream io.ReadCloser 20 err error 21} 22 23// DecodedBSONSource reads documents from the underlying io.ReadCloser, Stream which 24// wraps a stream of BSON documents. 25type DecodedBSONSource struct { 26 RawDocSource 27 err error 28} 29 30// RawDocSource wraps basic functions for reading a BSON source file. 31type RawDocSource interface { 32 LoadNext() []byte 33 Close() error 34 Err() error 35} 36 37// NewBSONSource creates a BSONSource with a reusable I/O buffer 38func NewBSONSource(in io.ReadCloser) *BSONSource { 39 return &BSONSource{make([]byte, MaxBSONSize), in, nil} 40} 41 42// NewBufferlessBSONSource creates a BSONSource without a reusable I/O buffer 43func NewBufferlessBSONSource(in io.ReadCloser) *BSONSource { 44 return &BSONSource{nil, in, nil} 45} 46 47// Close closes the BSONSource, rendering it unusable for I/O. 48// It returns an error, if any. 49func (bs *BSONSource) Close() error { 50 return bs.Stream.Close() 51} 52 53func NewDecodedBSONSource(ds RawDocSource) *DecodedBSONSource { 54 return &DecodedBSONSource{ds, nil} 55} 56 57// Err returns any error in the DecodedBSONSource or its RawDocSource. 58func (dbs *DecodedBSONSource) Err() error { 59 if dbs.err != nil { 60 return dbs.err 61 } 62 return dbs.RawDocSource.Err() 63} 64 65// Next unmarshals the next BSON document into result. Returns true if no errors 66// are encountered and false otherwise. 67func (dbs *DecodedBSONSource) Next(result interface{}) bool { 68 doc := dbs.LoadNext() 69 if doc == nil { 70 return false 71 } 72 if err := bson.Unmarshal(doc, result); err != nil { 73 dbs.err = err 74 return false 75 } 76 dbs.err = nil 77 return true 78} 79 80// LoadNext reads and returns the next BSON document in the stream. If the 81// BSONSource was created with NewBSONSource then each returned []byte will be 82// a slice of a single reused I/O buffer. If the BSONSource was created with 83// NewBufferlessBSONSource then each returend []byte will be individually 84// allocated 85func (bs *BSONSource) LoadNext() []byte { 86 var into []byte 87 if bs.reusableBuf == nil { 88 into = make([]byte, 4) 89 } else { 90 into = bs.reusableBuf 91 } 92 // read the bson object size (a 4 byte integer) 93 _, err := io.ReadAtLeast(bs.Stream, into[0:4], 4) 94 if err != nil { 95 if err != io.EOF { 96 bs.err = err 97 return nil 98 } 99 // we hit EOF right away, so we're at the end of the stream. 100 bs.err = nil 101 return nil 102 } 103 104 bsonSize := int32( 105 (uint32(into[0]) << 0) | 106 (uint32(into[1]) << 8) | 107 (uint32(into[2]) << 16) | 108 (uint32(into[3]) << 24), 109 ) 110 111 // Verify that the size of the BSON object we are about to read can 112 // actually fit into the buffer that was provided. If not, either the BSON is 113 // invalid, or the buffer passed in is too small. 114 // Verify that we do not have an invalid BSON document with size < 5. 115 if bsonSize > MaxBSONSize || bsonSize < 5 { 116 bs.err = fmt.Errorf("invalid BSONSize: %v bytes", bsonSize) 117 return nil 118 } 119 if int(bsonSize) > cap(into) { 120 bigInto := make([]byte, bsonSize) 121 copy(bigInto, into) 122 into = bigInto 123 if bs.reusableBuf != nil { 124 bs.reusableBuf = bigInto 125 } 126 } 127 into = into[:int(bsonSize)] 128 _, err = io.ReadAtLeast(bs.Stream, into[4:], int(bsonSize-4)) 129 if err != nil { 130 if err != io.EOF { 131 bs.err = err 132 return nil 133 } 134 // this case means we hit EOF but read a partial document, 135 // so there's a broken doc in the stream. Treat this as error. 136 bs.err = fmt.Errorf("invalid bson: %v", err) 137 return nil 138 } 139 140 bs.err = nil 141 return into 142} 143 144func (bs *BSONSource) Err() error { 145 return bs.err 146} 147