1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include "mongo/client/remote_command_targeter_factory_mock.h"
34 #include "mongo/client/remote_command_targeter_mock.h"
35 #include "mongo/db/logical_session_id.h"
36 #include "mongo/s/catalog/type_shard.h"
37 #include "mongo/s/client/shard_registry.h"
38 #include "mongo/s/sharding_test_fixture.h"
39 #include "mongo/s/write_ops/batch_write_exec.h"
40 #include "mongo/s/write_ops/batched_command_request.h"
41 #include "mongo/s/write_ops/batched_command_response.h"
42 #include "mongo/s/write_ops/mock_ns_targeter.h"
43 #include "mongo/stdx/memory.h"
44 #include "mongo/unittest/unittest.h"
45
46 namespace mongo {
47 namespace {
48
49 const HostAndPort kTestShardHost = HostAndPort("FakeHost", 12345);
50 const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
51 const std::string shardName = "FakeShard";
52 const int kMaxRoundsWithoutProgress = 5;
53
54 /**
55 * Mimics a single shard backend for a particular collection which can be initialized with a
56 * set of write command results to return.
57 */
58 class BatchWriteExecTest : public ShardingTestFixture {
59 public:
60 BatchWriteExecTest() = default;
61 ~BatchWriteExecTest() = default;
62
setUp()63 void setUp() override {
64 ShardingTestFixture::setUp();
65 setRemote(HostAndPort("ClientHost", 12345));
66
67 // Set up the RemoteCommandTargeter for the config shard.
68 configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
69
70 // Add a RemoteCommandTargeter for the data shard.
71 std::unique_ptr<RemoteCommandTargeterMock> targeter(
72 stdx::make_unique<RemoteCommandTargeterMock>());
73 targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHost));
74 targeter->setFindHostReturnValue(kTestShardHost);
75 targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHost),
76 std::move(targeter));
77
78 // Set up the shard registry to contain the fake shard.
79 ShardType shardType;
80 shardType.setName(shardName);
81 shardType.setHost(kTestShardHost.toString());
82 std::vector<ShardType> shards{shardType};
83 setupShards(shards);
84
85 // Set up the namespace targeter to target the fake shard.
86 nsTargeter.init(nss,
87 {MockRange(ShardEndpoint(shardName, ChunkVersion::IGNORED()),
88 BSON("x" << MINKEY),
89 BSON("x" << MAXKEY))});
90 }
91
expectInsertsReturnSuccess(const std::vector<BSONObj> & expected)92 void expectInsertsReturnSuccess(const std::vector<BSONObj>& expected) {
93 expectInsertsReturnSuccess(expected.begin(), expected.end());
94 }
95
expectInsertsReturnSuccess(std::vector<BSONObj>::const_iterator expectedFrom,std::vector<BSONObj>::const_iterator expectedTo)96 void expectInsertsReturnSuccess(std::vector<BSONObj>::const_iterator expectedFrom,
97 std::vector<BSONObj>::const_iterator expectedTo) {
98 onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
99 ASSERT_EQUALS(nss.db(), request.dbname);
100
101 const auto opMsgRequest(OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj));
102 const auto actualBatchedInsert(BatchedCommandRequest::parseInsert(opMsgRequest));
103 ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().ns());
104
105 const auto& inserted = actualBatchedInsert.getInsertRequest().getDocuments();
106 const size_t expectedSize = std::distance(expectedFrom, expectedTo);
107 ASSERT_EQUALS(expectedSize, inserted.size());
108
109 auto itInserted = inserted.begin();
110 auto itExpected = expectedFrom;
111
112 for (; itInserted != inserted.end(); itInserted++, itExpected++) {
113 ASSERT_BSONOBJ_EQ(*itExpected, *itInserted);
114 }
115
116 BatchedCommandResponse response;
117 response.setOk(true);
118 response.setN(inserted.size());
119
120 return response.toBSON();
121 });
122 }
123
expectInsertsReturnStaleVersionErrors(const std::vector<BSONObj> & expected)124 void expectInsertsReturnStaleVersionErrors(const std::vector<BSONObj>& expected) {
125 onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
126 ASSERT_EQUALS(nss.db(), request.dbname);
127
128 const auto opMsgRequest(OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj));
129 const auto actualBatchedInsert(BatchedCommandRequest::parseInsert(opMsgRequest));
130 ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().ns());
131
132 const auto& inserted = actualBatchedInsert.getInsertRequest().getDocuments();
133 ASSERT_EQUALS(expected.size(), inserted.size());
134
135 auto itInserted = inserted.begin();
136 auto itExpected = expected.begin();
137
138 for (; itInserted != inserted.end(); itInserted++, itExpected++) {
139 ASSERT_BSONOBJ_EQ(*itExpected, *itInserted);
140 }
141
142 BatchedCommandResponse staleResponse;
143 staleResponse.setOk(true);
144 staleResponse.setN(0);
145
146 // Report a stale version error for each write in the batch.
147 int i = 0;
148 for (itInserted = inserted.begin(); itInserted != inserted.end(); ++itInserted) {
149 WriteErrorDetail* error = new WriteErrorDetail;
150 error->setErrCode(ErrorCodes::StaleShardVersion);
151 error->setErrMessage("mock stale error");
152 error->setIndex(i);
153
154 staleResponse.addToErrDetails(error);
155 ++i;
156 }
157
158 return staleResponse.toBSON();
159 });
160 }
161
expectInsertsReturnError(const std::vector<BSONObj> & expected,const BatchedCommandResponse & errResponse)162 void expectInsertsReturnError(const std::vector<BSONObj>& expected,
163 const BatchedCommandResponse& errResponse) {
164 onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
165 ASSERT_EQUALS(nss.db(), request.dbname);
166
167 const auto opMsgRequest(OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj));
168 const auto actualBatchedInsert(BatchedCommandRequest::parseInsert(opMsgRequest));
169 ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().ns());
170
171 const auto& inserted = actualBatchedInsert.getInsertRequest().getDocuments();
172 ASSERT_EQUALS(expected.size(), inserted.size());
173
174 auto itInserted = inserted.begin();
175 auto itExpected = expected.begin();
176
177 for (; itInserted != inserted.end(); itInserted++, itExpected++) {
178 ASSERT_BSONOBJ_EQ(*itExpected, *itInserted);
179 }
180
181 return errResponse.toBSON();
182 });
183 }
184
185 ConnectionString shardHost{kTestShardHost};
186 NamespaceString nss{"foo.bar"};
187
188 MockNSTargeter nsTargeter;
189 };
190
191 //
192 // Tests for the BatchWriteExec
193 //
194
TEST_F(BatchWriteExecTest,SingleOp)195 TEST_F(BatchWriteExecTest, SingleOp) {
196 BatchedCommandRequest request([&] {
197 write_ops::Insert insertOp(nss);
198 insertOp.setWriteCommandBase([] {
199 write_ops::WriteCommandBase writeCommandBase;
200 writeCommandBase.setOrdered(false);
201 return writeCommandBase;
202 }());
203 insertOp.setDocuments({BSON("x" << 1)});
204 return insertOp;
205 }());
206 request.setWriteConcern(BSONObj());
207
208 // Do single-target, single doc batch write op
209 auto future = launchAsync([&] {
210 BatchedCommandResponse response;
211 BatchWriteExecStats stats;
212 BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
213 ASSERT(response.getOk());
214 ASSERT_EQ(1LL, response.getN());
215 ASSERT_EQ(1, stats.numRounds);
216 });
217
218 expectInsertsReturnSuccess(std::vector<BSONObj>{BSON("x" << 1)});
219
220 future.timed_get(kFutureTimeout);
221 }
222
TEST_F(BatchWriteExecTest,MultiOpLarge)223 TEST_F(BatchWriteExecTest, MultiOpLarge) {
224 const int kNumDocsToInsert = 100'000;
225 const std::string kDocValue(200, 'x');
226
227 std::vector<BSONObj> docsToInsert;
228 docsToInsert.reserve(kNumDocsToInsert);
229 for (int i = 0; i < kNumDocsToInsert; i++) {
230 docsToInsert.push_back(BSON("_id" << i << "someLargeKeyToWasteSpace" << kDocValue));
231 }
232
233 BatchedCommandRequest request([&] {
234 write_ops::Insert insertOp(nss);
235 insertOp.setWriteCommandBase([] {
236 write_ops::WriteCommandBase writeCommandBase;
237 writeCommandBase.setOrdered(true);
238 return writeCommandBase;
239 }());
240 insertOp.setDocuments(docsToInsert);
241 return insertOp;
242 }());
243 request.setWriteConcern(BSONObj());
244
245 auto future = launchAsync([&] {
246 BatchedCommandResponse response;
247 BatchWriteExecStats stats;
248 BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
249
250 ASSERT(response.getOk());
251 ASSERT_EQUALS(response.getN(), kNumDocsToInsert);
252 ASSERT_EQUALS(stats.numRounds, 2);
253 });
254
255 expectInsertsReturnSuccess(docsToInsert.begin(), docsToInsert.begin() + 66576);
256 expectInsertsReturnSuccess(docsToInsert.begin() + 66576, docsToInsert.end());
257
258 future.timed_get(kFutureTimeout);
259 }
260
TEST_F(BatchWriteExecTest,SingleOpError)261 TEST_F(BatchWriteExecTest, SingleOpError) {
262 BatchedCommandResponse errResponse;
263 errResponse.setOk(false);
264 errResponse.setErrCode(ErrorCodes::UnknownError);
265 errResponse.setErrMessage("mock error");
266
267 BatchedCommandRequest request([&] {
268 write_ops::Insert insertOp(nss);
269 insertOp.setWriteCommandBase([] {
270 write_ops::WriteCommandBase writeCommandBase;
271 writeCommandBase.setOrdered(false);
272 return writeCommandBase;
273 }());
274 insertOp.setDocuments({BSON("x" << 1)});
275 return insertOp;
276 }());
277 request.setWriteConcern(BSONObj());
278
279 auto future = launchAsync([&] {
280 BatchedCommandResponse response;
281 BatchWriteExecStats stats;
282 BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
283 ASSERT(response.getOk());
284 ASSERT_EQ(0, response.getN());
285 ASSERT(response.isErrDetailsSet());
286 ASSERT_EQ(errResponse.getErrCode(), response.getErrDetailsAt(0)->getErrCode());
287 ASSERT(response.getErrDetailsAt(0)->getErrMessage().find(errResponse.getErrMessage()) !=
288 std::string::npos);
289
290 ASSERT_EQ(1, stats.numRounds);
291 });
292
293 expectInsertsReturnError({BSON("x" << 1)}, errResponse);
294
295 future.timed_get(kFutureTimeout);
296 }
297
298 //
299 // Test retryable errors
300 //
301
TEST_F(BatchWriteExecTest,StaleOp)302 TEST_F(BatchWriteExecTest, StaleOp) {
303 BatchedCommandRequest request([&] {
304 write_ops::Insert insertOp(nss);
305 insertOp.setWriteCommandBase([] {
306 write_ops::WriteCommandBase writeCommandBase;
307 writeCommandBase.setOrdered(false);
308 return writeCommandBase;
309 }());
310 insertOp.setDocuments({BSON("x" << 1)});
311 return insertOp;
312 }());
313 request.setWriteConcern(BSONObj());
314
315 // Execute request
316 auto future = launchAsync([&] {
317 BatchedCommandResponse response;
318 BatchWriteExecStats stats;
319 BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
320 ASSERT(response.getOk());
321
322 ASSERT_EQUALS(1, stats.numStaleBatches);
323 });
324
325 const std::vector<BSONObj> expected{BSON("x" << 1)};
326
327 expectInsertsReturnStaleVersionErrors(expected);
328 expectInsertsReturnSuccess(expected);
329
330 future.timed_get(kFutureTimeout);
331 }
332
TEST_F(BatchWriteExecTest,MultiStaleOp)333 TEST_F(BatchWriteExecTest, MultiStaleOp) {
334 BatchedCommandRequest request([&] {
335 write_ops::Insert insertOp(nss);
336 insertOp.setWriteCommandBase([] {
337 write_ops::WriteCommandBase writeCommandBase;
338 writeCommandBase.setOrdered(false);
339 return writeCommandBase;
340 }());
341 insertOp.setDocuments({BSON("x" << 1)});
342 return insertOp;
343 }());
344 request.setWriteConcern(BSONObj());
345
346 auto future = launchAsync([&] {
347 BatchedCommandResponse response;
348 BatchWriteExecStats stats;
349 BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
350 ASSERT(response.getOk());
351
352 ASSERT_EQUALS(3, stats.numStaleBatches);
353 });
354
355 const std::vector<BSONObj> expected{BSON("x" << 1)};
356
357 // Return multiple StaleShardVersion errors, but less than the give-up number
358 for (int i = 0; i < 3; i++) {
359 expectInsertsReturnStaleVersionErrors(expected);
360 }
361
362 expectInsertsReturnSuccess(expected);
363
364 future.timed_get(kFutureTimeout);
365 }
366
TEST_F(BatchWriteExecTest,TooManyStaleOp)367 TEST_F(BatchWriteExecTest, TooManyStaleOp) {
368 // Retry op in exec too many times (without refresh) b/c of stale config (the mock nsTargeter
369 // doesn't report progress on refresh). We should report a no progress error for everything in
370 // the batch.
371 BatchedCommandRequest request([&] {
372 write_ops::Insert insertOp(nss);
373 insertOp.setWriteCommandBase([] {
374 write_ops::WriteCommandBase writeCommandBase;
375 writeCommandBase.setOrdered(false);
376 return writeCommandBase;
377 }());
378 insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)});
379 return insertOp;
380 }());
381 request.setWriteConcern(BSONObj());
382
383 auto future = launchAsync([&] {
384 BatchedCommandResponse response;
385 BatchWriteExecStats stats;
386 BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
387 ASSERT(response.getOk());
388 ASSERT_EQ(0, response.getN());
389 ASSERT(response.isErrDetailsSet());
390 ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), ErrorCodes::NoProgressMade);
391 ASSERT_EQUALS(response.getErrDetailsAt(1)->getErrCode(), ErrorCodes::NoProgressMade);
392
393 ASSERT_EQUALS(stats.numStaleBatches, (1 + kMaxRoundsWithoutProgress));
394 });
395
396 // Return multiple StaleShardVersion errors
397 for (int i = 0; i < (1 + kMaxRoundsWithoutProgress); i++) {
398 expectInsertsReturnStaleVersionErrors({BSON("x" << 1), BSON("x" << 2)});
399 }
400
401 future.timed_get(kFutureTimeout);
402 }
403
TEST_F(BatchWriteExecTest,RetryableWritesLargeBatch)404 TEST_F(BatchWriteExecTest, RetryableWritesLargeBatch) {
405 // A retryable error without a txnNumber is not retried.
406
407 const int kNumDocsToInsert = 100'000;
408 const std::string kDocValue(200, 'x');
409
410 std::vector<BSONObj> docsToInsert;
411 docsToInsert.reserve(kNumDocsToInsert);
412 for (int i = 0; i < kNumDocsToInsert; i++) {
413 docsToInsert.push_back(BSON("_id" << i << "someLargeKeyToWasteSpace" << kDocValue));
414 }
415
416 BatchedCommandRequest request([&] {
417 write_ops::Insert insertOp(nss);
418 insertOp.setWriteCommandBase([] {
419 write_ops::WriteCommandBase writeCommandBase;
420 writeCommandBase.setOrdered(true);
421 return writeCommandBase;
422 }());
423 insertOp.setDocuments(docsToInsert);
424 return insertOp;
425 }());
426 request.setWriteConcern(BSONObj());
427
428 operationContext()->setLogicalSessionId(makeLogicalSessionIdForTest());
429 operationContext()->setTxnNumber(5);
430
431 auto future = launchAsync([&] {
432 BatchedCommandResponse response;
433 BatchWriteExecStats stats;
434 BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
435
436 ASSERT(response.getOk());
437 ASSERT_EQUALS(response.getN(), kNumDocsToInsert);
438 ASSERT_EQUALS(stats.numRounds, 2);
439 });
440
441 expectInsertsReturnSuccess(docsToInsert.begin(), docsToInsert.begin() + 63791);
442 expectInsertsReturnSuccess(docsToInsert.begin() + 63791, docsToInsert.end());
443
444 future.timed_get(kFutureTimeout);
445 }
446
TEST_F(BatchWriteExecTest,RetryableErrorNoTxnNumber)447 TEST_F(BatchWriteExecTest, RetryableErrorNoTxnNumber) {
448 // A retryable error without a txnNumber is not retried.
449
450 BatchedCommandRequest request([&] {
451 write_ops::Insert insertOp(nss);
452 insertOp.setWriteCommandBase([] {
453 write_ops::WriteCommandBase writeCommandBase;
454 writeCommandBase.setOrdered(true);
455 return writeCommandBase;
456 }());
457 insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)});
458 return insertOp;
459 }());
460 request.setWriteConcern(BSONObj());
461
462 BatchedCommandResponse retryableErrResponse;
463 retryableErrResponse.setOk(false);
464 retryableErrResponse.setErrCode(ErrorCodes::NotMaster);
465 retryableErrResponse.setErrMessage("mock retryable error");
466
467 auto future = launchAsync([&] {
468 BatchedCommandResponse response;
469 BatchWriteExecStats stats;
470 BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
471
472 ASSERT(response.getOk());
473 ASSERT_EQ(0, response.getN());
474 ASSERT(response.isErrDetailsSet());
475 ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), retryableErrResponse.getErrCode());
476 ASSERT(response.getErrDetailsAt(0)->getErrMessage().find(
477 retryableErrResponse.getErrMessage()) != std::string::npos);
478 ASSERT_EQ(1, stats.numRounds);
479 });
480
481 expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, retryableErrResponse);
482
483 future.timed_get(kFutureTimeout);
484 }
485
TEST_F(BatchWriteExecTest,RetryableErrorTxnNumber)486 TEST_F(BatchWriteExecTest, RetryableErrorTxnNumber) {
487 // A retryable error with a txnNumber is automatically retried.
488
489 BatchedCommandRequest request([&] {
490 write_ops::Insert insertOp(nss);
491 insertOp.setWriteCommandBase([] {
492 write_ops::WriteCommandBase writeCommandBase;
493 writeCommandBase.setOrdered(true);
494 return writeCommandBase;
495 }());
496 insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)});
497 return insertOp;
498 }());
499 request.setWriteConcern(BSONObj());
500
501 operationContext()->setLogicalSessionId(makeLogicalSessionIdForTest());
502 operationContext()->setTxnNumber(5);
503
504 BatchedCommandResponse retryableErrResponse;
505 retryableErrResponse.setOk(false);
506 retryableErrResponse.setErrCode(ErrorCodes::NotMaster);
507 retryableErrResponse.setErrMessage("mock retryable error");
508
509 auto future = launchAsync([&] {
510 BatchedCommandResponse response;
511 BatchWriteExecStats stats;
512 BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
513
514 ASSERT(response.getOk());
515 ASSERT(!response.isErrDetailsSet());
516 ASSERT_EQ(1, stats.numRounds);
517 });
518
519 expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, retryableErrResponse);
520 expectInsertsReturnSuccess({BSON("x" << 1), BSON("x" << 2)});
521
522 future.timed_get(kFutureTimeout);
523 }
524
TEST_F(BatchWriteExecTest,NonRetryableErrorTxnNumber)525 TEST_F(BatchWriteExecTest, NonRetryableErrorTxnNumber) {
526 // A non-retryable error with a txnNumber is not retried.
527
528 BatchedCommandRequest request([&] {
529 write_ops::Insert insertOp(nss);
530 insertOp.setWriteCommandBase([] {
531 write_ops::WriteCommandBase writeCommandBase;
532 writeCommandBase.setOrdered(true);
533 return writeCommandBase;
534 }());
535 insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)});
536 return insertOp;
537 }());
538 request.setWriteConcern(BSONObj());
539
540 operationContext()->setLogicalSessionId(makeLogicalSessionIdForTest());
541 operationContext()->setTxnNumber(5);
542
543 BatchedCommandResponse nonRetryableErrResponse;
544 nonRetryableErrResponse.setOk(false);
545 nonRetryableErrResponse.setErrCode(ErrorCodes::UnknownError);
546 nonRetryableErrResponse.setErrMessage("mock non-retryable error");
547
548 auto future = launchAsync([&] {
549 BatchedCommandResponse response;
550 BatchWriteExecStats stats;
551 BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
552
553 ASSERT(response.getOk());
554 ASSERT_EQ(0, response.getN());
555 ASSERT(response.isErrDetailsSet());
556 ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(),
557 nonRetryableErrResponse.getErrCode());
558 ASSERT(response.getErrDetailsAt(0)->getErrMessage().find(
559 nonRetryableErrResponse.getErrMessage()) != std::string::npos);
560 ASSERT_EQ(1, stats.numRounds);
561 });
562
563 expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, nonRetryableErrResponse);
564
565 future.timed_get(kFutureTimeout);
566 }
567
568 } // namespace
569 } // namespace mongo
570