1// Copyright (C) MongoDB, Inc. 2014-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package db
8
9import (
10	"fmt"
11	"gopkg.in/mgo.v2/bson"
12	"io"
13)
14
15// BSONSource reads documents from the underlying io.ReadCloser, Stream which
16// wraps a stream of BSON documents.
17type BSONSource struct {
18	reusableBuf []byte
19	Stream      io.ReadCloser
20	err         error
21}
22
23// DecodedBSONSource reads documents from the underlying io.ReadCloser, Stream which
24// wraps a stream of BSON documents.
25type DecodedBSONSource struct {
26	RawDocSource
27	err error
28}
29
30// RawDocSource wraps basic functions for reading a BSON source file.
31type RawDocSource interface {
32	LoadNext() []byte
33	Close() error
34	Err() error
35}
36
37// NewBSONSource creates a BSONSource with a reusable I/O buffer
38func NewBSONSource(in io.ReadCloser) *BSONSource {
39	return &BSONSource{make([]byte, MaxBSONSize), in, nil}
40}
41
42// NewBufferlessBSONSource creates a BSONSource without a reusable I/O buffer
43func NewBufferlessBSONSource(in io.ReadCloser) *BSONSource {
44	return &BSONSource{nil, in, nil}
45}
46
47// Close closes the BSONSource, rendering it unusable for I/O.
48// It returns an error, if any.
49func (bs *BSONSource) Close() error {
50	return bs.Stream.Close()
51}
52
53func NewDecodedBSONSource(ds RawDocSource) *DecodedBSONSource {
54	return &DecodedBSONSource{ds, nil}
55}
56
57// Err returns any error in the DecodedBSONSource or its RawDocSource.
58func (dbs *DecodedBSONSource) Err() error {
59	if dbs.err != nil {
60		return dbs.err
61	}
62	return dbs.RawDocSource.Err()
63}
64
65// Next unmarshals the next BSON document into result. Returns true if no errors
66// are encountered and false otherwise.
67func (dbs *DecodedBSONSource) Next(result interface{}) bool {
68	doc := dbs.LoadNext()
69	if doc == nil {
70		return false
71	}
72	if err := bson.Unmarshal(doc, result); err != nil {
73		dbs.err = err
74		return false
75	}
76	dbs.err = nil
77	return true
78}
79
80// LoadNext reads and returns the next BSON document in the stream. If the
81// BSONSource was created with NewBSONSource then each returned []byte will be
82// a slice of a single reused I/O buffer. If the BSONSource was created with
83// NewBufferlessBSONSource then each returend []byte will be individually
84// allocated
85func (bs *BSONSource) LoadNext() []byte {
86	var into []byte
87	if bs.reusableBuf == nil {
88		into = make([]byte, 4)
89	} else {
90		into = bs.reusableBuf
91	}
92	// read the bson object size (a 4 byte integer)
93	_, err := io.ReadAtLeast(bs.Stream, into[0:4], 4)
94	if err != nil {
95		if err != io.EOF {
96			bs.err = err
97			return nil
98		}
99		// we hit EOF right away, so we're at the end of the stream.
100		bs.err = nil
101		return nil
102	}
103
104	bsonSize := int32(
105		(uint32(into[0]) << 0) |
106			(uint32(into[1]) << 8) |
107			(uint32(into[2]) << 16) |
108			(uint32(into[3]) << 24),
109	)
110
111	// Verify that the size of the BSON object we are about to read can
112	// actually fit into the buffer that was provided. If not, either the BSON is
113	// invalid, or the buffer passed in is too small.
114	// Verify that we do not have an invalid BSON document with size < 5.
115	if bsonSize > MaxBSONSize || bsonSize < 5 {
116		bs.err = fmt.Errorf("invalid BSONSize: %v bytes", bsonSize)
117		return nil
118	}
119	if int(bsonSize) > cap(into) {
120		bigInto := make([]byte, bsonSize)
121		copy(bigInto, into)
122		into = bigInto
123		if bs.reusableBuf != nil {
124			bs.reusableBuf = bigInto
125		}
126	}
127	into = into[:int(bsonSize)]
128	_, err = io.ReadAtLeast(bs.Stream, into[4:], int(bsonSize-4))
129	if err != nil {
130		if err != io.EOF {
131			bs.err = err
132			return nil
133		}
134		// this case means we hit EOF but read a partial document,
135		// so there's a broken doc in the stream. Treat this as error.
136		bs.err = fmt.Errorf("invalid bson: %v", err)
137		return nil
138	}
139
140	bs.err = nil
141	return into
142}
143
144func (bs *BSONSource) Err() error {
145	return bs.err
146}
147