1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include <boost/intrusive_ptr.hpp>
34 #include <deque>
35 #include <vector>
36
37 #include "mongo/bson/bsonmisc.h"
38 #include "mongo/bson/bsonobj.h"
39 #include "mongo/bson/bsonobjbuilder.h"
40 #include "mongo/db/pipeline/aggregation_context_fixture.h"
41 #include "mongo/db/pipeline/document.h"
42 #include "mongo/db/pipeline/document_source_change_stream.h"
43 #include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
44 #include "mongo/db/pipeline/document_source_mock.h"
45 #include "mongo/db/pipeline/document_value_test_util.h"
46 #include "mongo/db/pipeline/field_path.h"
47 #include "mongo/db/pipeline/stub_mongo_process_interface.h"
48 #include "mongo/db/pipeline/value.h"
49
50 namespace mongo {
51 namespace {
52 using boost::intrusive_ptr;
53 using std::deque;
54 using std::vector;
55
56 // This provides access to getExpCtx(), but we'll use a different name for this test suite.
57 class DocumentSourceLookupChangePostImageTest : public AggregationContextFixture {
58 public:
59 /**
60 * This method is required to avoid a static initialization fiasco resulting from calling
61 * UUID::gen() in file static scope.
62 */
testUuid()63 static const UUID& testUuid() {
64 static const UUID* uuid_gen = new UUID(UUID::gen());
65 return *uuid_gen;
66 }
67
makeResumeToken(ImplicitValue id=Value ())68 Document makeResumeToken(ImplicitValue id = Value()) {
69 const Timestamp ts(100, 1);
70 if (id.missing()) {
71 ResumeTokenData tokenData;
72 tokenData.clusterTime = ts;
73 return ResumeToken(tokenData).toDocument();
74 }
75 return ResumeToken(ResumeTokenData(ts, Value(Document{{"_id", id}}), testUuid()))
76 .toDocument();
77 }
78 };
79
80 /**
81 * A mock MongoProcessInterface which allows mocking a foreign pipeline.
82 */
83 class MockMongoProcessInterface final : public StubMongoProcessInterface {
84 public:
MockMongoProcessInterface(deque<DocumentSource::GetNextResult> mockResults)85 MockMongoProcessInterface(deque<DocumentSource::GetNextResult> mockResults)
86 : _mockResults(std::move(mockResults)) {}
87
isSharded(const NamespaceString & ns)88 bool isSharded(const NamespaceString& ns) final {
89 return false;
90 }
91
makePipeline(const std::vector<BSONObj> & rawPipeline,const boost::intrusive_ptr<ExpressionContext> & expCtx,const MakePipelineOptions opts=MakePipelineOptions{})92 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
93 const std::vector<BSONObj>& rawPipeline,
94 const boost::intrusive_ptr<ExpressionContext>& expCtx,
95 const MakePipelineOptions opts = MakePipelineOptions{}) final {
96 auto pipeline = Pipeline::parse(rawPipeline, expCtx);
97 if (!pipeline.isOK()) {
98 return pipeline.getStatus();
99 }
100
101 if (opts.optimize) {
102 pipeline.getValue()->optimizePipeline();
103 }
104
105 if (opts.attachCursorSource) {
106 uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
107 }
108
109 return pipeline;
110 }
111
attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext> & expCtx,Pipeline * pipeline)112 Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
113 Pipeline* pipeline) final {
114 pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
115 return Status::OK();
116 }
117
lookupSingleDocument(const NamespaceString & nss,UUID collectionUUID,const Document & documentKey,boost::optional<BSONObj> readConcern)118 boost::optional<Document> lookupSingleDocument(const NamespaceString& nss,
119 UUID collectionUUID,
120 const Document& documentKey,
121 boost::optional<BSONObj> readConcern) {
122 boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
123 auto swPipeline = makePipeline({BSON("$match" << documentKey)}, expCtx);
124 if (swPipeline == ErrorCodes::NamespaceNotFound) {
125 return boost::none;
126 }
127 auto pipeline = uassertStatusOK(std::move(swPipeline));
128
129 auto lookedUpDocument = pipeline->getNext();
130 if (auto next = pipeline->getNext()) {
131 uasserted(ErrorCodes::TooManyMatchingDocuments,
132 str::stream() << "found more than one document matching "
133 << documentKey.toString()
134 << " ["
135 << lookedUpDocument->toString()
136 << ", "
137 << next->toString()
138 << "]");
139 }
140 return lookedUpDocument;
141 }
142
143 private:
144 deque<DocumentSource::GetNextResult> _mockResults;
145 };
146
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldErrorIfMissingDocumentKeyOnUpdate)147 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) {
148 auto expCtx = getExpCtx();
149
150 // Set up the $lookup stage.
151 auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
152
153 // Mock its input with a document without a "documentKey" field.
154 auto mockLocalSource = DocumentSourceMock::create(
155 Document{{"_id", makeResumeToken(0)},
156 {"operationType", "update"_sd},
157 {"fullDocument", Document{{"_id", 0}}},
158 {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}});
159
160 lookupChangeStage->setSource(mockLocalSource.get());
161
162 // Mock out the foreign collection.
163 lookupChangeStage->injectMongoProcessInterface(
164 std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
165
166 ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
167 }
168
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldErrorIfMissingOperationType)169 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationType) {
170 auto expCtx = getExpCtx();
171
172 // Set up the $lookup stage.
173 auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
174
175 // Mock its input with a document without a "ns" field.
176 auto mockLocalSource = DocumentSourceMock::create(
177 Document{{"_id", makeResumeToken(0)},
178 {"documentKey", Document{{"_id", 0}}},
179 {"fullDocument", Document{{"_id", 0}}},
180 {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}});
181
182 lookupChangeStage->setSource(mockLocalSource.get());
183
184 // Mock out the foreign collection.
185 lookupChangeStage->injectMongoProcessInterface(
186 std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
187
188 ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
189 }
190
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldErrorIfMissingNamespace)191 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) {
192 auto expCtx = getExpCtx();
193
194 // Set up the $lookup stage.
195 auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
196
197 // Mock its input with a document without a "ns" field.
198 auto mockLocalSource = DocumentSourceMock::create(Document{
199 {"_id", makeResumeToken(0)},
200 {"documentKey", Document{{"_id", 0}}},
201 {"operationType", "update"_sd},
202 });
203
204 lookupChangeStage->setSource(mockLocalSource.get());
205
206 // Mock out the foreign collection.
207 lookupChangeStage->injectMongoProcessInterface(
208 std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
209
210 ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
211 }
212
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldErrorIfNsFieldHasWrongType)213 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType) {
214 auto expCtx = getExpCtx();
215
216 // Set up the $lookup stage.
217 auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
218
219 // Mock its input with a document without a "ns" field.
220 auto mockLocalSource =
221 DocumentSourceMock::create(Document{{"_id", makeResumeToken(0)},
222 {"documentKey", Document{{"_id", 0}}},
223 {"operationType", "update"_sd},
224 {"ns", 4}});
225
226 lookupChangeStage->setSource(mockLocalSource.get());
227
228 // Mock out the foreign collection.
229 lookupChangeStage->injectMongoProcessInterface(
230 std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
231
232 ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
233 }
234
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldErrorIfNsFieldDoesNotMatchPipeline)235 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatchPipeline) {
236 auto expCtx = getExpCtx();
237
238 // Set up the $lookup stage.
239 auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
240
241 // Mock its input with a document without a "ns" field.
242 auto mockLocalSource = DocumentSourceMock::create(
243 Document{{"_id", makeResumeToken(0)},
244 {"documentKey", Document{{"_id", 0}}},
245 {"operationType", "update"_sd},
246 {"ns", Document{{"db", "DIFFERENT"_sd}, {"coll", expCtx->ns.coll()}}}});
247
248 lookupChangeStage->setSource(mockLocalSource.get());
249
250 // Mock out the foreign collection.
251 lookupChangeStage->injectMongoProcessInterface(
252 std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
253
254 ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40579);
255 }
256
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldErrorIfDocumentKeyIsNotUnique)257 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUnique) {
258 auto expCtx = getExpCtx();
259
260 // Set up the $lookup stage.
261 auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
262
263 // Mock its input with an update document.
264 auto mockLocalSource = DocumentSourceMock::create(
265 Document{{"_id", makeResumeToken(0)},
266 {"documentKey", Document{{"_id", 0}}},
267 {"operationType", "update"_sd},
268 {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}});
269
270 lookupChangeStage->setSource(mockLocalSource.get());
271
272 // Mock out the foreign collection to have two documents with the same document key.
273 deque<DocumentSource::GetNextResult> foreignCollection = {Document{{"_id", 0}},
274 Document{{"_id", 0}}};
275 lookupChangeStage->injectMongoProcessInterface(
276 std::make_shared<MockMongoProcessInterface>(std::move(foreignCollection)));
277
278 ASSERT_THROWS_CODE(
279 lookupChangeStage->getNext(), AssertionException, ErrorCodes::TooManyMatchingDocuments);
280 }
281
TEST_F(DocumentSourceLookupChangePostImageTest,ShouldPropagatePauses)282 TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) {
283 auto expCtx = getExpCtx();
284
285 // Set up the $lookup stage.
286 auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
287
288 // Mock its input, pausing every other result.
289 auto mockLocalSource = DocumentSourceMock::create(
290 {Document{{"_id", makeResumeToken(0)},
291 {"documentKey", Document{{"_id", 0}}},
292 {"operationType", "insert"_sd},
293 {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}},
294 {"fullDocument", Document{{"_id", 0}}}},
295 DocumentSource::GetNextResult::makePauseExecution(),
296 Document{{"_id", makeResumeToken(1)},
297 {"documentKey", Document{{"_id", 1}}},
298 {"operationType", "update"_sd},
299 {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}},
300 DocumentSource::GetNextResult::makePauseExecution()});
301
302 lookupChangeStage->setSource(mockLocalSource.get());
303
304 // Mock out the foreign collection.
305 deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}},
306 Document{{"_id", 1}}};
307 lookupChangeStage->injectMongoProcessInterface(
308 std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents)));
309
310 auto next = lookupChangeStage->getNext();
311 ASSERT_TRUE(next.isAdvanced());
312 ASSERT_DOCUMENT_EQ(
313 next.releaseDocument(),
314 (Document{{"_id", makeResumeToken(0)},
315 {"documentKey", Document{{"_id", 0}}},
316 {"operationType", "insert"_sd},
317 {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}},
318 {"fullDocument", Document{{"_id", 0}}}}));
319
320 ASSERT_TRUE(lookupChangeStage->getNext().isPaused());
321
322 next = lookupChangeStage->getNext();
323 ASSERT_TRUE(next.isAdvanced());
324 ASSERT_DOCUMENT_EQ(
325 next.releaseDocument(),
326 (Document{{"_id", makeResumeToken(1)},
327 {"documentKey", Document{{"_id", 1}}},
328 {"operationType", "update"_sd},
329 {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}},
330 {"fullDocument", Document{{"_id", 1}}}}));
331
332 ASSERT_TRUE(lookupChangeStage->getNext().isPaused());
333
334 ASSERT_TRUE(lookupChangeStage->getNext().isEOF());
335 ASSERT_TRUE(lookupChangeStage->getNext().isEOF());
336 }
337
338 } // namespace
339 } // namespace mongo
340