1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include <boost/intrusive_ptr.hpp>
34 #include <deque>
35 #include <vector>
36 
37 #include "mongo/bson/bsonmisc.h"
38 #include "mongo/bson/bsonobj.h"
39 #include "mongo/bson/bsonobjbuilder.h"
40 #include "mongo/db/pipeline/aggregation_context_fixture.h"
41 #include "mongo/db/pipeline/document.h"
42 #include "mongo/db/pipeline/document_source_change_stream.h"
43 #include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
44 #include "mongo/db/pipeline/document_source_mock.h"
45 #include "mongo/db/pipeline/document_value_test_util.h"
46 #include "mongo/db/pipeline/field_path.h"
47 #include "mongo/db/pipeline/stub_mongo_process_interface.h"
48 #include "mongo/db/pipeline/value.h"
49 
50 namespace mongo {
51 namespace {
52 using boost::intrusive_ptr;
53 using std::deque;
54 using std::vector;
55 
56 // This provides access to getExpCtx(), but we'll use a different name for this test suite.
57 class DocumentSourceLookupChangePostImageTest : public AggregationContextFixture {
58 public:
59     /**
60      * This method is required to avoid a static initialization fiasco resulting from calling
61      * UUID::gen() in file static scope.
62      */
testUuid()63     static const UUID& testUuid() {
64         static const UUID* uuid_gen = new UUID(UUID::gen());
65         return *uuid_gen;
66     }
67 
makeResumeToken(ImplicitValue id=Value ())68     Document makeResumeToken(ImplicitValue id = Value()) {
69         const Timestamp ts(100, 1);
70         if (id.missing()) {
71             ResumeTokenData tokenData;
72             tokenData.clusterTime = ts;
73             return ResumeToken(tokenData).toDocument();
74         }
75         return ResumeToken(ResumeTokenData(ts, Value(Document{{"_id", id}}), testUuid()))
76             .toDocument();
77     }
78 };
79 
80 /**
81  * A mock MongoProcessInterface which allows mocking a foreign pipeline.
82  */
83 class MockMongoProcessInterface final : public StubMongoProcessInterface {
84 public:
MockMongoProcessInterface(deque<DocumentSource::GetNextResult> mockResults)85     MockMongoProcessInterface(deque<DocumentSource::GetNextResult> mockResults)
86         : _mockResults(std::move(mockResults)) {}
87 
isSharded(const NamespaceString & ns)88     bool isSharded(const NamespaceString& ns) final {
89         return false;
90     }
91 
makePipeline(const std::vector<BSONObj> & rawPipeline,const boost::intrusive_ptr<ExpressionContext> & expCtx,const MakePipelineOptions opts=MakePipelineOptions{})92     StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
93         const std::vector<BSONObj>& rawPipeline,
94         const boost::intrusive_ptr<ExpressionContext>& expCtx,
95         const MakePipelineOptions opts = MakePipelineOptions{}) final {
96         auto pipeline = Pipeline::parse(rawPipeline, expCtx);
97         if (!pipeline.isOK()) {
98             return pipeline.getStatus();
99         }
100 
101         if (opts.optimize) {
102             pipeline.getValue()->optimizePipeline();
103         }
104 
105         if (opts.attachCursorSource) {
106             uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
107         }
108 
109         return pipeline;
110     }
111 
attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext> & expCtx,Pipeline * pipeline)112     Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
113                                         Pipeline* pipeline) final {
114         pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
115         return Status::OK();
116     }
117 
lookupSingleDocument(const NamespaceString & nss,UUID collectionUUID,const Document & documentKey,boost::optional<BSONObj> readConcern)118     boost::optional<Document> lookupSingleDocument(const NamespaceString& nss,
119                                                    UUID collectionUUID,
120                                                    const Document& documentKey,
121                                                    boost::optional<BSONObj> readConcern) {
122         boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
123         auto swPipeline = makePipeline({BSON("$match" << documentKey)}, expCtx);
124         if (swPipeline == ErrorCodes::NamespaceNotFound) {
125             return boost::none;
126         }
127         auto pipeline = uassertStatusOK(std::move(swPipeline));
128 
129         auto lookedUpDocument = pipeline->getNext();
130         if (auto next = pipeline->getNext()) {
131             uasserted(ErrorCodes::TooManyMatchingDocuments,
132                       str::stream() << "found more than one document matching "
133                                     << documentKey.toString()
134                                     << " ["
135                                     << lookedUpDocument->toString()
136                                     << ", "
137                                     << next->toString()
138                                     << "]");
139         }
140         return lookedUpDocument;
141     }
142 
143 private:
144     deque<DocumentSource::GetNextResult> _mockResults;
145 };
146 
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldErrorIfMissingDocumentKeyOnUpdate)147 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) {
148     auto expCtx = getExpCtx();
149 
150     // Set up the $lookup stage.
151     auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
152 
153     // Mock its input with a document without a "documentKey" field.
154     auto mockLocalSource = DocumentSourceMock::create(
155         Document{{"_id", makeResumeToken(0)},
156                  {"operationType", "update"_sd},
157                  {"fullDocument", Document{{"_id", 0}}},
158                  {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}});
159 
160     lookupChangeStage->setSource(mockLocalSource.get());
161 
162     // Mock out the foreign collection.
163     lookupChangeStage->injectMongoProcessInterface(
164         std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
165 
166     ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
167 }
168 
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldErrorIfMissingOperationType)169 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationType) {
170     auto expCtx = getExpCtx();
171 
172     // Set up the $lookup stage.
173     auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
174 
175     // Mock its input with a document without a "ns" field.
176     auto mockLocalSource = DocumentSourceMock::create(
177         Document{{"_id", makeResumeToken(0)},
178                  {"documentKey", Document{{"_id", 0}}},
179                  {"fullDocument", Document{{"_id", 0}}},
180                  {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}});
181 
182     lookupChangeStage->setSource(mockLocalSource.get());
183 
184     // Mock out the foreign collection.
185     lookupChangeStage->injectMongoProcessInterface(
186         std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
187 
188     ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
189 }
190 
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldErrorIfMissingNamespace)191 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) {
192     auto expCtx = getExpCtx();
193 
194     // Set up the $lookup stage.
195     auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
196 
197     // Mock its input with a document without a "ns" field.
198     auto mockLocalSource = DocumentSourceMock::create(Document{
199         {"_id", makeResumeToken(0)},
200         {"documentKey", Document{{"_id", 0}}},
201         {"operationType", "update"_sd},
202     });
203 
204     lookupChangeStage->setSource(mockLocalSource.get());
205 
206     // Mock out the foreign collection.
207     lookupChangeStage->injectMongoProcessInterface(
208         std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
209 
210     ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
211 }
212 
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldErrorIfNsFieldHasWrongType)213 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType) {
214     auto expCtx = getExpCtx();
215 
216     // Set up the $lookup stage.
217     auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
218 
219     // Mock its input with a document without a "ns" field.
220     auto mockLocalSource =
221         DocumentSourceMock::create(Document{{"_id", makeResumeToken(0)},
222                                             {"documentKey", Document{{"_id", 0}}},
223                                             {"operationType", "update"_sd},
224                                             {"ns", 4}});
225 
226     lookupChangeStage->setSource(mockLocalSource.get());
227 
228     // Mock out the foreign collection.
229     lookupChangeStage->injectMongoProcessInterface(
230         std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
231 
232     ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
233 }
234 
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldErrorIfNsFieldDoesNotMatchPipeline)235 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatchPipeline) {
236     auto expCtx = getExpCtx();
237 
238     // Set up the $lookup stage.
239     auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
240 
241     // Mock its input with a document without a "ns" field.
242     auto mockLocalSource = DocumentSourceMock::create(
243         Document{{"_id", makeResumeToken(0)},
244                  {"documentKey", Document{{"_id", 0}}},
245                  {"operationType", "update"_sd},
246                  {"ns", Document{{"db", "DIFFERENT"_sd}, {"coll", expCtx->ns.coll()}}}});
247 
248     lookupChangeStage->setSource(mockLocalSource.get());
249 
250     // Mock out the foreign collection.
251     lookupChangeStage->injectMongoProcessInterface(
252         std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
253 
254     ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40579);
255 }
256 
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldErrorIfDocumentKeyIsNotUnique)257 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUnique) {
258     auto expCtx = getExpCtx();
259 
260     // Set up the $lookup stage.
261     auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
262 
263     // Mock its input with an update document.
264     auto mockLocalSource = DocumentSourceMock::create(
265         Document{{"_id", makeResumeToken(0)},
266                  {"documentKey", Document{{"_id", 0}}},
267                  {"operationType", "update"_sd},
268                  {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}});
269 
270     lookupChangeStage->setSource(mockLocalSource.get());
271 
272     // Mock out the foreign collection to have two documents with the same document key.
273     deque<DocumentSource::GetNextResult> foreignCollection = {Document{{"_id", 0}},
274                                                               Document{{"_id", 0}}};
275     lookupChangeStage->injectMongoProcessInterface(
276         std::make_shared<MockMongoProcessInterface>(std::move(foreignCollection)));
277 
278     ASSERT_THROWS_CODE(
279         lookupChangeStage->getNext(), AssertionException, ErrorCodes::TooManyMatchingDocuments);
280 }
281 
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldPropagatePauses)282 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) {
283     auto expCtx = getExpCtx();
284 
285     // Set up the $lookup stage.
286     auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
287 
288     // Mock its input, pausing every other result.
289     auto mockLocalSource = DocumentSourceMock::create(
290         {Document{{"_id", makeResumeToken(0)},
291                   {"documentKey", Document{{"_id", 0}}},
292                   {"operationType", "insert"_sd},
293                   {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}},
294                   {"fullDocument", Document{{"_id", 0}}}},
295          DocumentSource::GetNextResult::makePauseExecution(),
296          Document{{"_id", makeResumeToken(1)},
297                   {"documentKey", Document{{"_id", 1}}},
298                   {"operationType", "update"_sd},
299                   {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}},
300          DocumentSource::GetNextResult::makePauseExecution()});
301 
302     lookupChangeStage->setSource(mockLocalSource.get());
303 
304     // Mock out the foreign collection.
305     deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}},
306                                                              Document{{"_id", 1}}};
307     lookupChangeStage->injectMongoProcessInterface(
308         std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents)));
309 
310     auto next = lookupChangeStage->getNext();
311     ASSERT_TRUE(next.isAdvanced());
312     ASSERT_DOCUMENT_EQ(
313         next.releaseDocument(),
314         (Document{{"_id", makeResumeToken(0)},
315                   {"documentKey", Document{{"_id", 0}}},
316                   {"operationType", "insert"_sd},
317                   {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}},
318                   {"fullDocument", Document{{"_id", 0}}}}));
319 
320     ASSERT_TRUE(lookupChangeStage->getNext().isPaused());
321 
322     next = lookupChangeStage->getNext();
323     ASSERT_TRUE(next.isAdvanced());
324     ASSERT_DOCUMENT_EQ(
325         next.releaseDocument(),
326         (Document{{"_id", makeResumeToken(1)},
327                   {"documentKey", Document{{"_id", 1}}},
328                   {"operationType", "update"_sd},
329                   {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}},
330                   {"fullDocument", Document{{"_id", 1}}}}));
331 
332     ASSERT_TRUE(lookupChangeStage->getNext().isPaused());
333 
334     ASSERT_TRUE(lookupChangeStage->getNext().isEOF());
335     ASSERT_TRUE(lookupChangeStage->getNext().isEOF());
336 }
337 
338 }  // namespace
339 }  // namespace mongo
340