1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32
33 #include "mongo/platform/basic.h"
34
35 #include <set>
36 #include <string>
37 #include <vector>
38
39 #include "mongo/client/remote_command_targeter_mock.h"
40 #include "mongo/db/client.h"
41 #include "mongo/db/commands.h"
42 #include "mongo/db/ops/write_ops.h"
43 #include "mongo/db/write_concern.h"
44 #include "mongo/executor/network_interface_mock.h"
45 #include "mongo/executor/task_executor.h"
46 #include "mongo/rpc/metadata/repl_set_metadata.h"
47 #include "mongo/s/catalog/dist_lock_manager_mock.h"
48 #include "mongo/s/catalog/sharding_catalog_client_impl.h"
49 #include "mongo/s/catalog/sharding_catalog_test_fixture.h"
50 #include "mongo/s/catalog/type_changelog.h"
51 #include "mongo/s/catalog/type_chunk.h"
52 #include "mongo/s/catalog/type_collection.h"
53 #include "mongo/s/catalog/type_database.h"
54 #include "mongo/s/catalog/type_shard.h"
55 #include "mongo/s/client/shard_registry.h"
56 #include "mongo/s/grid.h"
57 #include "mongo/s/write_ops/batched_command_response.h"
58 #include "mongo/stdx/future.h"
59 #include "mongo/stdx/memory.h"
60 #include "mongo/util/log.h"
61
62 namespace mongo {
63 namespace {
64
65 using executor::NetworkInterfaceMock;
66 using executor::RemoteCommandRequest;
67 using executor::RemoteCommandResponse;
68 using executor::TaskExecutor;
69 using std::set;
70 using std::string;
71 using std::vector;
72 using unittest::assertGet;
73
74 using InsertRetryTest = ShardingCatalogTestFixture;
75 using UpdateRetryTest = ShardingCatalogTestFixture;
76
77 const NamespaceString kTestNamespace("config.TestColl");
78 const HostAndPort kTestHosts[] = {
79 HostAndPort("TestHost1:12345"), HostAndPort("TestHost2:12345"), HostAndPort("TestHost3:12345")};
80
TEST_F(InsertRetryTest,RetryOnInterruptedAndNetworkErrorSuccess)81 TEST_F(InsertRetryTest, RetryOnInterruptedAndNetworkErrorSuccess) {
82 configTargeter()->setFindHostReturnValue({kTestHosts[0]});
83
84 BSONObj objToInsert = BSON("_id" << 1 << "Value"
85 << "TestValue");
86
87 auto future = launchAsync([&] {
88 Status status =
89 catalogClient()->insertConfigDocument(operationContext(),
90 kTestNamespace.ns(),
91 objToInsert,
92 ShardingCatalogClient::kMajorityWriteConcern);
93 ASSERT_OK(status);
94 });
95
96 onCommand([&](const RemoteCommandRequest& request) {
97 ASSERT_EQ(request.target, kTestHosts[0]);
98 configTargeter()->setFindHostReturnValue({kTestHosts[1]});
99 return Status(ErrorCodes::InterruptedDueToReplStateChange, "Interruption");
100 });
101
102 onCommand([&](const RemoteCommandRequest& request) {
103 ASSERT_EQ(request.target, kTestHosts[1]);
104 configTargeter()->setFindHostReturnValue({kTestHosts[2]});
105 return Status(ErrorCodes::NetworkTimeout, "Network timeout");
106 });
107
108 expectInserts(kTestNamespace, {objToInsert});
109
110 future.timed_get(kFutureTimeout);
111 }
112
TEST_F(InsertRetryTest,RetryOnNetworkErrorFails)113 TEST_F(InsertRetryTest, RetryOnNetworkErrorFails) {
114 configTargeter()->setFindHostReturnValue({kTestHosts[0]});
115
116 BSONObj objToInsert = BSON("_id" << 1 << "Value"
117 << "TestValue");
118
119 auto future = launchAsync([&] {
120 Status status =
121 catalogClient()->insertConfigDocument(operationContext(),
122 kTestNamespace.ns(),
123 objToInsert,
124 ShardingCatalogClient::kMajorityWriteConcern);
125 ASSERT_EQ(ErrorCodes::NetworkTimeout, status.code());
126 });
127
128 onCommand([&](const RemoteCommandRequest& request) {
129 ASSERT_EQ(request.target, kTestHosts[0]);
130 configTargeter()->setFindHostReturnValue({kTestHosts[1]});
131 return Status(ErrorCodes::NetworkTimeout, "Network timeout");
132 });
133
134 onCommand([&](const RemoteCommandRequest& request) {
135 ASSERT_EQ(request.target, kTestHosts[1]);
136 configTargeter()->setFindHostReturnValue({kTestHosts[2]});
137 return Status(ErrorCodes::NetworkTimeout, "Network timeout");
138 });
139
140 onCommand([&](const RemoteCommandRequest& request) {
141 ASSERT_EQ(request.target, kTestHosts[2]);
142 return Status(ErrorCodes::NetworkTimeout, "Network timeout");
143 });
144
145 future.timed_get(kFutureTimeout);
146 }
147
TEST_F(InsertRetryTest,DuplicateKeyErrorAfterNetworkErrorMatch)148 TEST_F(InsertRetryTest, DuplicateKeyErrorAfterNetworkErrorMatch) {
149 configTargeter()->setFindHostReturnValue({kTestHosts[0]});
150
151 BSONObj objToInsert = BSON("_id" << 1 << "Value"
152 << "TestValue");
153
154 auto future = launchAsync([&] {
155 Status status =
156 catalogClient()->insertConfigDocument(operationContext(),
157 kTestNamespace.ns(),
158 objToInsert,
159 ShardingCatalogClient::kMajorityWriteConcern);
160 ASSERT_OK(status);
161 });
162
163 onCommand([&](const RemoteCommandRequest& request) {
164 ASSERT_EQ(request.target, kTestHosts[0]);
165 configTargeter()->setFindHostReturnValue({kTestHosts[1]});
166 return Status(ErrorCodes::NetworkTimeout, "Network timeout");
167 });
168
169 onCommand([&](const RemoteCommandRequest& request) {
170 ASSERT_EQ(request.target, kTestHosts[1]);
171 return Status(ErrorCodes::DuplicateKey, "Duplicate key");
172 });
173
174 onFindCommand([&](const RemoteCommandRequest& request) {
175 ASSERT_EQ(request.target, kTestHosts[1]);
176 auto query =
177 assertGet(QueryRequest::makeFromFindCommand(kTestNamespace, request.cmdObj, false));
178 ASSERT_BSONOBJ_EQ(BSON("_id" << 1), query->getFilter());
179
180 return vector<BSONObj>{objToInsert};
181 });
182
183 future.timed_get(kFutureTimeout);
184 }
185
TEST_F(InsertRetryTest,DuplicateKeyErrorAfterNetworkErrorNotFound)186 TEST_F(InsertRetryTest, DuplicateKeyErrorAfterNetworkErrorNotFound) {
187 configTargeter()->setFindHostReturnValue({kTestHosts[0]});
188
189 BSONObj objToInsert = BSON("_id" << 1 << "Value"
190 << "TestValue");
191
192 auto future = launchAsync([&] {
193 Status status =
194 catalogClient()->insertConfigDocument(operationContext(),
195 kTestNamespace.ns(),
196 objToInsert,
197 ShardingCatalogClient::kMajorityWriteConcern);
198 ASSERT_EQ(ErrorCodes::DuplicateKey, status.code());
199 });
200
201 onCommand([&](const RemoteCommandRequest& request) {
202 ASSERT_EQ(request.target, kTestHosts[0]);
203 configTargeter()->setFindHostReturnValue({kTestHosts[1]});
204 return Status(ErrorCodes::NetworkTimeout, "Network timeout");
205 });
206
207 onCommand([&](const RemoteCommandRequest& request) {
208 ASSERT_EQ(request.target, kTestHosts[1]);
209 return Status(ErrorCodes::DuplicateKey, "Duplicate key");
210 });
211
212 onFindCommand([&](const RemoteCommandRequest& request) {
213 ASSERT_EQ(request.target, kTestHosts[1]);
214 auto query =
215 assertGet(QueryRequest::makeFromFindCommand(kTestNamespace, request.cmdObj, false));
216 ASSERT_BSONOBJ_EQ(BSON("_id" << 1), query->getFilter());
217
218 return vector<BSONObj>();
219 });
220
221 future.timed_get(kFutureTimeout);
222 }
223
TEST_F(InsertRetryTest,DuplicateKeyErrorAfterNetworkErrorMismatch)224 TEST_F(InsertRetryTest, DuplicateKeyErrorAfterNetworkErrorMismatch) {
225 configTargeter()->setFindHostReturnValue({kTestHosts[0]});
226
227 BSONObj objToInsert = BSON("_id" << 1 << "Value"
228 << "TestValue");
229
230 auto future = launchAsync([&] {
231 Status status =
232 catalogClient()->insertConfigDocument(operationContext(),
233 kTestNamespace.ns(),
234 objToInsert,
235 ShardingCatalogClient::kMajorityWriteConcern);
236 ASSERT_EQ(ErrorCodes::DuplicateKey, status.code());
237 });
238
239 onCommand([&](const RemoteCommandRequest& request) {
240 ASSERT_EQ(request.target, kTestHosts[0]);
241 configTargeter()->setFindHostReturnValue({kTestHosts[1]});
242 return Status(ErrorCodes::NetworkTimeout, "Network timeout");
243 });
244
245 onCommand([&](const RemoteCommandRequest& request) {
246 ASSERT_EQ(request.target, kTestHosts[1]);
247 return Status(ErrorCodes::DuplicateKey, "Duplicate key");
248 });
249
250 onFindCommand([&](const RemoteCommandRequest& request) {
251 ASSERT_EQ(request.target, kTestHosts[1]);
252 auto query =
253 assertGet(QueryRequest::makeFromFindCommand(kTestNamespace, request.cmdObj, false));
254 ASSERT_BSONOBJ_EQ(BSON("_id" << 1), query->getFilter());
255
256 return vector<BSONObj>{BSON("_id" << 1 << "Value"
257 << "TestValue has changed")};
258 });
259
260 future.timed_get(kFutureTimeout);
261 }
262
TEST_F(InsertRetryTest,DuplicateKeyErrorAfterWriteConcernFailureMatch)263 TEST_F(InsertRetryTest, DuplicateKeyErrorAfterWriteConcernFailureMatch) {
264 configTargeter()->setFindHostReturnValue({kTestHosts[0]});
265
266 BSONObj objToInsert = BSON("_id" << 1 << "Value"
267 << "TestValue");
268
269 auto future = launchAsync([&] {
270 Status status =
271 catalogClient()->insertConfigDocument(operationContext(),
272 kTestNamespace.ns(),
273 objToInsert,
274 ShardingCatalogClient::kMajorityWriteConcern);
275 ASSERT_OK(status);
276 });
277
278 onCommand([&](const RemoteCommandRequest& request) {
279 const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
280 const auto insertOp = InsertOp::parse(opMsgRequest);
281 ASSERT_EQUALS(kTestNamespace.ns(), insertOp.getNamespace().ns());
282
283 BatchedCommandResponse response;
284 response.setOk(true);
285 response.setN(1);
286
287 auto wcError = stdx::make_unique<WriteConcernErrorDetail>();
288
289 WriteConcernResult wcRes;
290 wcRes.err = "timeout";
291 wcRes.wTimedOut = true;
292
293 Status wcStatus(ErrorCodes::NetworkTimeout, "Failed to wait for write concern");
294 wcError->setErrCode(wcStatus.code());
295 wcError->setErrMessage(wcStatus.reason());
296 wcError->setErrInfo(BSON("wtimeout" << true));
297
298 response.setWriteConcernError(wcError.release());
299
300 return response.toBSON();
301 });
302
303 onCommand([&](const RemoteCommandRequest& request) {
304 ASSERT_EQ(request.target, kTestHosts[0]);
305 return Status(ErrorCodes::DuplicateKey, "Duplicate key");
306 });
307
308 onFindCommand([&](const RemoteCommandRequest& request) {
309 ASSERT_EQ(request.target, kTestHosts[0]);
310 auto query =
311 assertGet(QueryRequest::makeFromFindCommand(kTestNamespace, request.cmdObj, false));
312 ASSERT_BSONOBJ_EQ(BSON("_id" << 1), query->getFilter());
313
314 return vector<BSONObj>{objToInsert};
315 });
316
317 future.timed_get(kFutureTimeout);
318 }
319
TEST_F(UpdateRetryTest,Success)320 TEST_F(UpdateRetryTest, Success) {
321 configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
322
323 BSONObj objToUpdate = BSON("_id" << 1 << "Value"
324 << "TestValue");
325 BSONObj updateExpr = BSON("$set" << BSON("Value"
326 << "NewTestValue"));
327
328 auto future = launchAsync([&] {
329 auto status =
330 catalogClient()->updateConfigDocument(operationContext(),
331 kTestNamespace.ns(),
332 objToUpdate,
333 updateExpr,
334 false,
335 ShardingCatalogClient::kMajorityWriteConcern);
336 ASSERT_OK(status);
337 });
338
339 onCommand([&](const RemoteCommandRequest& request) {
340 const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
341 const auto updateOp = UpdateOp::parse(opMsgRequest);
342 ASSERT_EQUALS(kTestNamespace.ns(), updateOp.getNamespace().ns());
343
344 BatchedCommandResponse response;
345 response.setOk(true);
346 response.setNModified(1);
347
348 return response.toBSON();
349 });
350
351 future.timed_get(kFutureTimeout);
352 }
353
TEST_F(UpdateRetryTest,NotMasterErrorReturnedPersistently)354 TEST_F(UpdateRetryTest, NotMasterErrorReturnedPersistently) {
355 configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
356
357 BSONObj objToUpdate = BSON("_id" << 1 << "Value"
358 << "TestValue");
359 BSONObj updateExpr = BSON("$set" << BSON("Value"
360 << "NewTestValue"));
361
362 auto future = launchAsync([&] {
363 auto status =
364 catalogClient()->updateConfigDocument(operationContext(),
365 kTestNamespace.ns(),
366 objToUpdate,
367 updateExpr,
368 false,
369 ShardingCatalogClient::kMajorityWriteConcern);
370 ASSERT_EQUALS(ErrorCodes::NotMaster, status);
371 });
372
373 for (int i = 0; i < 3; ++i) {
374 onCommand([](const RemoteCommandRequest& request) {
375 BatchedCommandResponse response;
376 response.setOk(false);
377 response.setErrCode(ErrorCodes::NotMaster);
378 response.setErrMessage("not master");
379
380 return response.toBSON();
381 });
382 }
383
384 future.timed_get(kFutureTimeout);
385 }
386
TEST_F(UpdateRetryTest,NotMasterReturnedFromTargeter)387 TEST_F(UpdateRetryTest, NotMasterReturnedFromTargeter) {
388 configTargeter()->setFindHostReturnValue(Status(ErrorCodes::NotMaster, "not master"));
389
390 BSONObj objToUpdate = BSON("_id" << 1 << "Value"
391 << "TestValue");
392 BSONObj updateExpr = BSON("$set" << BSON("Value"
393 << "NewTestValue"));
394
395 auto future = launchAsync([&] {
396 auto status =
397 catalogClient()->updateConfigDocument(operationContext(),
398 kTestNamespace.ns(),
399 objToUpdate,
400 updateExpr,
401 false,
402 ShardingCatalogClient::kMajorityWriteConcern);
403 ASSERT_EQUALS(ErrorCodes::NotMaster, status);
404 });
405
406 future.timed_get(kFutureTimeout);
407 }
408
TEST_F(UpdateRetryTest,NotMasterOnceSuccessAfterRetry)409 TEST_F(UpdateRetryTest, NotMasterOnceSuccessAfterRetry) {
410 HostAndPort host1("TestHost1");
411 HostAndPort host2("TestHost2");
412 configTargeter()->setFindHostReturnValue(host1);
413
414 CollectionType collection;
415 collection.setNs(NamespaceString("db.coll"));
416 collection.setUpdatedAt(network()->now());
417 collection.setUnique(true);
418 collection.setEpoch(OID::gen());
419 collection.setKeyPattern(KeyPattern(BSON("_id" << 1)));
420
421 BSONObj objToUpdate = BSON("_id" << 1 << "Value"
422 << "TestValue");
423 BSONObj updateExpr = BSON("$set" << BSON("Value"
424 << "NewTestValue"));
425
426 auto future = launchAsync([&] {
427 ASSERT_OK(
428 catalogClient()->updateConfigDocument(operationContext(),
429 kTestNamespace.ns(),
430 objToUpdate,
431 updateExpr,
432 false,
433 ShardingCatalogClient::kMajorityWriteConcern));
434 });
435
436 onCommand([&](const RemoteCommandRequest& request) {
437 ASSERT_EQUALS(host1, request.target);
438
439 BatchedCommandResponse response;
440 response.setOk(false);
441 response.setErrCode(ErrorCodes::NotMaster);
442 response.setErrMessage("not master");
443
444 // Ensure that when the catalog manager tries to retarget after getting the
445 // NotMaster response, it will get back a new target.
446 configTargeter()->setFindHostReturnValue(host2);
447 return response.toBSON();
448 });
449
450 onCommand([&](const RemoteCommandRequest& request) {
451 const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
452 const auto updateOp = UpdateOp::parse(opMsgRequest);
453 ASSERT_EQUALS(kTestNamespace.ns(), updateOp.getNamespace().ns());
454
455 BatchedCommandResponse response;
456 response.setOk(true);
457 response.setNModified(1);
458
459 return response.toBSON();
460 });
461
462 future.timed_get(kFutureTimeout);
463 }
464
TEST_F(UpdateRetryTest,OperationInterruptedDueToPrimaryStepDown)465 TEST_F(UpdateRetryTest, OperationInterruptedDueToPrimaryStepDown) {
466 configTargeter()->setFindHostReturnValue({kTestHosts[0]});
467
468 BSONObj objToUpdate = BSON("_id" << 1 << "Value"
469 << "TestValue");
470 BSONObj updateExpr = BSON("$set" << BSON("Value"
471 << "NewTestValue"));
472
473 auto future = launchAsync([&] {
474 auto status =
475 catalogClient()->updateConfigDocument(operationContext(),
476 kTestNamespace.ns(),
477 objToUpdate,
478 updateExpr,
479 false,
480 ShardingCatalogClient::kMajorityWriteConcern);
481 ASSERT_OK(status);
482 });
483
484 onCommand([&](const RemoteCommandRequest& request) {
485 const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
486 const auto updateOp = UpdateOp::parse(opMsgRequest);
487 ASSERT_EQUALS(kTestNamespace.ns(), updateOp.getNamespace().ns());
488
489 BatchedCommandResponse response;
490
491 auto writeErrDetail = stdx::make_unique<WriteErrorDetail>();
492 writeErrDetail->setIndex(0);
493 writeErrDetail->setErrCode(ErrorCodes::InterruptedDueToReplStateChange);
494 writeErrDetail->setErrMessage("Operation interrupted");
495 response.addToErrDetails(writeErrDetail.release());
496
497 return response.toBSON();
498 });
499
500 onCommand([&](const RemoteCommandRequest& request) {
501 const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
502 const auto updateOp = UpdateOp::parse(opMsgRequest);
503 ASSERT_EQUALS(kTestNamespace.ns(), updateOp.getNamespace().ns());
504
505 BatchedCommandResponse response;
506 response.setOk(true);
507 response.setNModified(1);
508
509 return response.toBSON();
510 });
511
512 future.timed_get(kFutureTimeout);
513 }
514
TEST_F(UpdateRetryTest,WriteConcernFailure)515 TEST_F(UpdateRetryTest, WriteConcernFailure) {
516 configTargeter()->setFindHostReturnValue({kTestHosts[0]});
517
518 BSONObj objToUpdate = BSON("_id" << 1 << "Value"
519 << "TestValue");
520 BSONObj updateExpr = BSON("$set" << BSON("Value"
521 << "NewTestValue"));
522
523 auto future = launchAsync([&] {
524 auto status =
525 catalogClient()->updateConfigDocument(operationContext(),
526 kTestNamespace.ns(),
527 objToUpdate,
528 updateExpr,
529 false,
530 ShardingCatalogClient::kMajorityWriteConcern);
531 ASSERT_OK(status);
532 });
533
534 onCommand([&](const RemoteCommandRequest& request) {
535 const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
536 const auto updateOp = UpdateOp::parse(opMsgRequest);
537 ASSERT_EQUALS(kTestNamespace.ns(), updateOp.getNamespace().ns());
538
539 BatchedCommandResponse response;
540 response.setOk(true);
541 response.setNModified(1);
542
543 auto wcError = stdx::make_unique<WriteConcernErrorDetail>();
544
545 WriteConcernResult wcRes;
546 wcRes.err = "timeout";
547 wcRes.wTimedOut = true;
548
549 Status wcStatus(ErrorCodes::NetworkTimeout, "Failed to wait for write concern");
550 wcError->setErrCode(wcStatus.code());
551 wcError->setErrMessage(wcStatus.reason());
552 wcError->setErrInfo(BSON("wtimeout" << true));
553
554 response.setWriteConcernError(wcError.release());
555
556 return response.toBSON();
557 });
558
559 onCommand([&](const RemoteCommandRequest& request) {
560 const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
561 const auto updateOp = UpdateOp::parse(opMsgRequest);
562 ASSERT_EQUALS(kTestNamespace.ns(), updateOp.getNamespace().ns());
563
564 BatchedCommandResponse response;
565 response.setOk(true);
566 response.setNModified(0);
567
568 return response.toBSON();
569 });
570
571 future.timed_get(kFutureTimeout);
572 }
573
574 } // namespace
575 } // namespace mongo
576