1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32 
33 #include "mongo/platform/basic.h"
34 
35 #include <set>
36 #include <string>
37 #include <vector>
38 
39 #include "mongo/client/remote_command_targeter_mock.h"
40 #include "mongo/db/client.h"
41 #include "mongo/db/commands.h"
42 #include "mongo/db/ops/write_ops.h"
43 #include "mongo/db/write_concern.h"
44 #include "mongo/executor/network_interface_mock.h"
45 #include "mongo/executor/task_executor.h"
46 #include "mongo/rpc/metadata/repl_set_metadata.h"
47 #include "mongo/s/catalog/dist_lock_manager_mock.h"
48 #include "mongo/s/catalog/sharding_catalog_client_impl.h"
49 #include "mongo/s/catalog/sharding_catalog_test_fixture.h"
50 #include "mongo/s/catalog/type_changelog.h"
51 #include "mongo/s/catalog/type_chunk.h"
52 #include "mongo/s/catalog/type_collection.h"
53 #include "mongo/s/catalog/type_database.h"
54 #include "mongo/s/catalog/type_shard.h"
55 #include "mongo/s/client/shard_registry.h"
56 #include "mongo/s/grid.h"
57 #include "mongo/s/write_ops/batched_command_response.h"
58 #include "mongo/stdx/future.h"
59 #include "mongo/stdx/memory.h"
60 #include "mongo/util/log.h"
61 
62 namespace mongo {
63 namespace {
64 
65 using executor::NetworkInterfaceMock;
66 using executor::RemoteCommandRequest;
67 using executor::RemoteCommandResponse;
68 using executor::TaskExecutor;
69 using std::set;
70 using std::string;
71 using std::vector;
72 using unittest::assertGet;
73 
74 using InsertRetryTest = ShardingCatalogTestFixture;
75 using UpdateRetryTest = ShardingCatalogTestFixture;
76 
77 const NamespaceString kTestNamespace("config.TestColl");
78 const HostAndPort kTestHosts[] = {
79     HostAndPort("TestHost1:12345"), HostAndPort("TestHost2:12345"), HostAndPort("TestHost3:12345")};
80 
TEST_F(InsertRetryTest,RetryOnInterruptedAndNetworkErrorSuccess)81 TEST_F(InsertRetryTest, RetryOnInterruptedAndNetworkErrorSuccess) {
82     configTargeter()->setFindHostReturnValue({kTestHosts[0]});
83 
84     BSONObj objToInsert = BSON("_id" << 1 << "Value"
85                                      << "TestValue");
86 
87     auto future = launchAsync([&] {
88         Status status =
89             catalogClient()->insertConfigDocument(operationContext(),
90                                                   kTestNamespace.ns(),
91                                                   objToInsert,
92                                                   ShardingCatalogClient::kMajorityWriteConcern);
93         ASSERT_OK(status);
94     });
95 
96     onCommand([&](const RemoteCommandRequest& request) {
97         ASSERT_EQ(request.target, kTestHosts[0]);
98         configTargeter()->setFindHostReturnValue({kTestHosts[1]});
99         return Status(ErrorCodes::InterruptedDueToReplStateChange, "Interruption");
100     });
101 
102     onCommand([&](const RemoteCommandRequest& request) {
103         ASSERT_EQ(request.target, kTestHosts[1]);
104         configTargeter()->setFindHostReturnValue({kTestHosts[2]});
105         return Status(ErrorCodes::NetworkTimeout, "Network timeout");
106     });
107 
108     expectInserts(kTestNamespace, {objToInsert});
109 
110     future.timed_get(kFutureTimeout);
111 }
112 
TEST_F(InsertRetryTest,RetryOnNetworkErrorFails)113 TEST_F(InsertRetryTest, RetryOnNetworkErrorFails) {
114     configTargeter()->setFindHostReturnValue({kTestHosts[0]});
115 
116     BSONObj objToInsert = BSON("_id" << 1 << "Value"
117                                      << "TestValue");
118 
119     auto future = launchAsync([&] {
120         Status status =
121             catalogClient()->insertConfigDocument(operationContext(),
122                                                   kTestNamespace.ns(),
123                                                   objToInsert,
124                                                   ShardingCatalogClient::kMajorityWriteConcern);
125         ASSERT_EQ(ErrorCodes::NetworkTimeout, status.code());
126     });
127 
128     onCommand([&](const RemoteCommandRequest& request) {
129         ASSERT_EQ(request.target, kTestHosts[0]);
130         configTargeter()->setFindHostReturnValue({kTestHosts[1]});
131         return Status(ErrorCodes::NetworkTimeout, "Network timeout");
132     });
133 
134     onCommand([&](const RemoteCommandRequest& request) {
135         ASSERT_EQ(request.target, kTestHosts[1]);
136         configTargeter()->setFindHostReturnValue({kTestHosts[2]});
137         return Status(ErrorCodes::NetworkTimeout, "Network timeout");
138     });
139 
140     onCommand([&](const RemoteCommandRequest& request) {
141         ASSERT_EQ(request.target, kTestHosts[2]);
142         return Status(ErrorCodes::NetworkTimeout, "Network timeout");
143     });
144 
145     future.timed_get(kFutureTimeout);
146 }
147 
TEST_F(InsertRetryTest,DuplicateKeyErrorAfterNetworkErrorMatch)148 TEST_F(InsertRetryTest, DuplicateKeyErrorAfterNetworkErrorMatch) {
149     configTargeter()->setFindHostReturnValue({kTestHosts[0]});
150 
151     BSONObj objToInsert = BSON("_id" << 1 << "Value"
152                                      << "TestValue");
153 
154     auto future = launchAsync([&] {
155         Status status =
156             catalogClient()->insertConfigDocument(operationContext(),
157                                                   kTestNamespace.ns(),
158                                                   objToInsert,
159                                                   ShardingCatalogClient::kMajorityWriteConcern);
160         ASSERT_OK(status);
161     });
162 
163     onCommand([&](const RemoteCommandRequest& request) {
164         ASSERT_EQ(request.target, kTestHosts[0]);
165         configTargeter()->setFindHostReturnValue({kTestHosts[1]});
166         return Status(ErrorCodes::NetworkTimeout, "Network timeout");
167     });
168 
169     onCommand([&](const RemoteCommandRequest& request) {
170         ASSERT_EQ(request.target, kTestHosts[1]);
171         return Status(ErrorCodes::DuplicateKey, "Duplicate key");
172     });
173 
174     onFindCommand([&](const RemoteCommandRequest& request) {
175         ASSERT_EQ(request.target, kTestHosts[1]);
176         auto query =
177             assertGet(QueryRequest::makeFromFindCommand(kTestNamespace, request.cmdObj, false));
178         ASSERT_BSONOBJ_EQ(BSON("_id" << 1), query->getFilter());
179 
180         return vector<BSONObj>{objToInsert};
181     });
182 
183     future.timed_get(kFutureTimeout);
184 }
185 
TEST_F(InsertRetryTest,DuplicateKeyErrorAfterNetworkErrorNotFound)186 TEST_F(InsertRetryTest, DuplicateKeyErrorAfterNetworkErrorNotFound) {
187     configTargeter()->setFindHostReturnValue({kTestHosts[0]});
188 
189     BSONObj objToInsert = BSON("_id" << 1 << "Value"
190                                      << "TestValue");
191 
192     auto future = launchAsync([&] {
193         Status status =
194             catalogClient()->insertConfigDocument(operationContext(),
195                                                   kTestNamespace.ns(),
196                                                   objToInsert,
197                                                   ShardingCatalogClient::kMajorityWriteConcern);
198         ASSERT_EQ(ErrorCodes::DuplicateKey, status.code());
199     });
200 
201     onCommand([&](const RemoteCommandRequest& request) {
202         ASSERT_EQ(request.target, kTestHosts[0]);
203         configTargeter()->setFindHostReturnValue({kTestHosts[1]});
204         return Status(ErrorCodes::NetworkTimeout, "Network timeout");
205     });
206 
207     onCommand([&](const RemoteCommandRequest& request) {
208         ASSERT_EQ(request.target, kTestHosts[1]);
209         return Status(ErrorCodes::DuplicateKey, "Duplicate key");
210     });
211 
212     onFindCommand([&](const RemoteCommandRequest& request) {
213         ASSERT_EQ(request.target, kTestHosts[1]);
214         auto query =
215             assertGet(QueryRequest::makeFromFindCommand(kTestNamespace, request.cmdObj, false));
216         ASSERT_BSONOBJ_EQ(BSON("_id" << 1), query->getFilter());
217 
218         return vector<BSONObj>();
219     });
220 
221     future.timed_get(kFutureTimeout);
222 }
223 
TEST_F(InsertRetryTest,DuplicateKeyErrorAfterNetworkErrorMismatch)224 TEST_F(InsertRetryTest, DuplicateKeyErrorAfterNetworkErrorMismatch) {
225     configTargeter()->setFindHostReturnValue({kTestHosts[0]});
226 
227     BSONObj objToInsert = BSON("_id" << 1 << "Value"
228                                      << "TestValue");
229 
230     auto future = launchAsync([&] {
231         Status status =
232             catalogClient()->insertConfigDocument(operationContext(),
233                                                   kTestNamespace.ns(),
234                                                   objToInsert,
235                                                   ShardingCatalogClient::kMajorityWriteConcern);
236         ASSERT_EQ(ErrorCodes::DuplicateKey, status.code());
237     });
238 
239     onCommand([&](const RemoteCommandRequest& request) {
240         ASSERT_EQ(request.target, kTestHosts[0]);
241         configTargeter()->setFindHostReturnValue({kTestHosts[1]});
242         return Status(ErrorCodes::NetworkTimeout, "Network timeout");
243     });
244 
245     onCommand([&](const RemoteCommandRequest& request) {
246         ASSERT_EQ(request.target, kTestHosts[1]);
247         return Status(ErrorCodes::DuplicateKey, "Duplicate key");
248     });
249 
250     onFindCommand([&](const RemoteCommandRequest& request) {
251         ASSERT_EQ(request.target, kTestHosts[1]);
252         auto query =
253             assertGet(QueryRequest::makeFromFindCommand(kTestNamespace, request.cmdObj, false));
254         ASSERT_BSONOBJ_EQ(BSON("_id" << 1), query->getFilter());
255 
256         return vector<BSONObj>{BSON("_id" << 1 << "Value"
257                                           << "TestValue has changed")};
258     });
259 
260     future.timed_get(kFutureTimeout);
261 }
262 
TEST_F(InsertRetryTest,DuplicateKeyErrorAfterWriteConcernFailureMatch)263 TEST_F(InsertRetryTest, DuplicateKeyErrorAfterWriteConcernFailureMatch) {
264     configTargeter()->setFindHostReturnValue({kTestHosts[0]});
265 
266     BSONObj objToInsert = BSON("_id" << 1 << "Value"
267                                      << "TestValue");
268 
269     auto future = launchAsync([&] {
270         Status status =
271             catalogClient()->insertConfigDocument(operationContext(),
272                                                   kTestNamespace.ns(),
273                                                   objToInsert,
274                                                   ShardingCatalogClient::kMajorityWriteConcern);
275         ASSERT_OK(status);
276     });
277 
278     onCommand([&](const RemoteCommandRequest& request) {
279         const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
280         const auto insertOp = InsertOp::parse(opMsgRequest);
281         ASSERT_EQUALS(kTestNamespace.ns(), insertOp.getNamespace().ns());
282 
283         BatchedCommandResponse response;
284         response.setOk(true);
285         response.setN(1);
286 
287         auto wcError = stdx::make_unique<WriteConcernErrorDetail>();
288 
289         WriteConcernResult wcRes;
290         wcRes.err = "timeout";
291         wcRes.wTimedOut = true;
292 
293         Status wcStatus(ErrorCodes::NetworkTimeout, "Failed to wait for write concern");
294         wcError->setErrCode(wcStatus.code());
295         wcError->setErrMessage(wcStatus.reason());
296         wcError->setErrInfo(BSON("wtimeout" << true));
297 
298         response.setWriteConcernError(wcError.release());
299 
300         return response.toBSON();
301     });
302 
303     onCommand([&](const RemoteCommandRequest& request) {
304         ASSERT_EQ(request.target, kTestHosts[0]);
305         return Status(ErrorCodes::DuplicateKey, "Duplicate key");
306     });
307 
308     onFindCommand([&](const RemoteCommandRequest& request) {
309         ASSERT_EQ(request.target, kTestHosts[0]);
310         auto query =
311             assertGet(QueryRequest::makeFromFindCommand(kTestNamespace, request.cmdObj, false));
312         ASSERT_BSONOBJ_EQ(BSON("_id" << 1), query->getFilter());
313 
314         return vector<BSONObj>{objToInsert};
315     });
316 
317     future.timed_get(kFutureTimeout);
318 }
319 
TEST_F(UpdateRetryTest,Success)320 TEST_F(UpdateRetryTest, Success) {
321     configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
322 
323     BSONObj objToUpdate = BSON("_id" << 1 << "Value"
324                                      << "TestValue");
325     BSONObj updateExpr = BSON("$set" << BSON("Value"
326                                              << "NewTestValue"));
327 
328     auto future = launchAsync([&] {
329         auto status =
330             catalogClient()->updateConfigDocument(operationContext(),
331                                                   kTestNamespace.ns(),
332                                                   objToUpdate,
333                                                   updateExpr,
334                                                   false,
335                                                   ShardingCatalogClient::kMajorityWriteConcern);
336         ASSERT_OK(status);
337     });
338 
339     onCommand([&](const RemoteCommandRequest& request) {
340         const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
341         const auto updateOp = UpdateOp::parse(opMsgRequest);
342         ASSERT_EQUALS(kTestNamespace.ns(), updateOp.getNamespace().ns());
343 
344         BatchedCommandResponse response;
345         response.setOk(true);
346         response.setNModified(1);
347 
348         return response.toBSON();
349     });
350 
351     future.timed_get(kFutureTimeout);
352 }
353 
TEST_F(UpdateRetryTest,NotMasterErrorReturnedPersistently)354 TEST_F(UpdateRetryTest, NotMasterErrorReturnedPersistently) {
355     configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
356 
357     BSONObj objToUpdate = BSON("_id" << 1 << "Value"
358                                      << "TestValue");
359     BSONObj updateExpr = BSON("$set" << BSON("Value"
360                                              << "NewTestValue"));
361 
362     auto future = launchAsync([&] {
363         auto status =
364             catalogClient()->updateConfigDocument(operationContext(),
365                                                   kTestNamespace.ns(),
366                                                   objToUpdate,
367                                                   updateExpr,
368                                                   false,
369                                                   ShardingCatalogClient::kMajorityWriteConcern);
370         ASSERT_EQUALS(ErrorCodes::NotMaster, status);
371     });
372 
373     for (int i = 0; i < 3; ++i) {
374         onCommand([](const RemoteCommandRequest& request) {
375             BatchedCommandResponse response;
376             response.setOk(false);
377             response.setErrCode(ErrorCodes::NotMaster);
378             response.setErrMessage("not master");
379 
380             return response.toBSON();
381         });
382     }
383 
384     future.timed_get(kFutureTimeout);
385 }
386 
TEST_F(UpdateRetryTest,NotMasterReturnedFromTargeter)387 TEST_F(UpdateRetryTest, NotMasterReturnedFromTargeter) {
388     configTargeter()->setFindHostReturnValue(Status(ErrorCodes::NotMaster, "not master"));
389 
390     BSONObj objToUpdate = BSON("_id" << 1 << "Value"
391                                      << "TestValue");
392     BSONObj updateExpr = BSON("$set" << BSON("Value"
393                                              << "NewTestValue"));
394 
395     auto future = launchAsync([&] {
396         auto status =
397             catalogClient()->updateConfigDocument(operationContext(),
398                                                   kTestNamespace.ns(),
399                                                   objToUpdate,
400                                                   updateExpr,
401                                                   false,
402                                                   ShardingCatalogClient::kMajorityWriteConcern);
403         ASSERT_EQUALS(ErrorCodes::NotMaster, status);
404     });
405 
406     future.timed_get(kFutureTimeout);
407 }
408 
TEST_F(UpdateRetryTest,NotMasterOnceSuccessAfterRetry)409 TEST_F(UpdateRetryTest, NotMasterOnceSuccessAfterRetry) {
410     HostAndPort host1("TestHost1");
411     HostAndPort host2("TestHost2");
412     configTargeter()->setFindHostReturnValue(host1);
413 
414     CollectionType collection;
415     collection.setNs(NamespaceString("db.coll"));
416     collection.setUpdatedAt(network()->now());
417     collection.setUnique(true);
418     collection.setEpoch(OID::gen());
419     collection.setKeyPattern(KeyPattern(BSON("_id" << 1)));
420 
421     BSONObj objToUpdate = BSON("_id" << 1 << "Value"
422                                      << "TestValue");
423     BSONObj updateExpr = BSON("$set" << BSON("Value"
424                                              << "NewTestValue"));
425 
426     auto future = launchAsync([&] {
427         ASSERT_OK(
428             catalogClient()->updateConfigDocument(operationContext(),
429                                                   kTestNamespace.ns(),
430                                                   objToUpdate,
431                                                   updateExpr,
432                                                   false,
433                                                   ShardingCatalogClient::kMajorityWriteConcern));
434     });
435 
436     onCommand([&](const RemoteCommandRequest& request) {
437         ASSERT_EQUALS(host1, request.target);
438 
439         BatchedCommandResponse response;
440         response.setOk(false);
441         response.setErrCode(ErrorCodes::NotMaster);
442         response.setErrMessage("not master");
443 
444         // Ensure that when the catalog manager tries to retarget after getting the
445         // NotMaster response, it will get back a new target.
446         configTargeter()->setFindHostReturnValue(host2);
447         return response.toBSON();
448     });
449 
450     onCommand([&](const RemoteCommandRequest& request) {
451         const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
452         const auto updateOp = UpdateOp::parse(opMsgRequest);
453         ASSERT_EQUALS(kTestNamespace.ns(), updateOp.getNamespace().ns());
454 
455         BatchedCommandResponse response;
456         response.setOk(true);
457         response.setNModified(1);
458 
459         return response.toBSON();
460     });
461 
462     future.timed_get(kFutureTimeout);
463 }
464 
TEST_F(UpdateRetryTest,OperationInterruptedDueToPrimaryStepDown)465 TEST_F(UpdateRetryTest, OperationInterruptedDueToPrimaryStepDown) {
466     configTargeter()->setFindHostReturnValue({kTestHosts[0]});
467 
468     BSONObj objToUpdate = BSON("_id" << 1 << "Value"
469                                      << "TestValue");
470     BSONObj updateExpr = BSON("$set" << BSON("Value"
471                                              << "NewTestValue"));
472 
473     auto future = launchAsync([&] {
474         auto status =
475             catalogClient()->updateConfigDocument(operationContext(),
476                                                   kTestNamespace.ns(),
477                                                   objToUpdate,
478                                                   updateExpr,
479                                                   false,
480                                                   ShardingCatalogClient::kMajorityWriteConcern);
481         ASSERT_OK(status);
482     });
483 
484     onCommand([&](const RemoteCommandRequest& request) {
485         const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
486         const auto updateOp = UpdateOp::parse(opMsgRequest);
487         ASSERT_EQUALS(kTestNamespace.ns(), updateOp.getNamespace().ns());
488 
489         BatchedCommandResponse response;
490 
491         auto writeErrDetail = stdx::make_unique<WriteErrorDetail>();
492         writeErrDetail->setIndex(0);
493         writeErrDetail->setErrCode(ErrorCodes::InterruptedDueToReplStateChange);
494         writeErrDetail->setErrMessage("Operation interrupted");
495         response.addToErrDetails(writeErrDetail.release());
496 
497         return response.toBSON();
498     });
499 
500     onCommand([&](const RemoteCommandRequest& request) {
501         const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
502         const auto updateOp = UpdateOp::parse(opMsgRequest);
503         ASSERT_EQUALS(kTestNamespace.ns(), updateOp.getNamespace().ns());
504 
505         BatchedCommandResponse response;
506         response.setOk(true);
507         response.setNModified(1);
508 
509         return response.toBSON();
510     });
511 
512     future.timed_get(kFutureTimeout);
513 }
514 
TEST_F(UpdateRetryTest,WriteConcernFailure)515 TEST_F(UpdateRetryTest, WriteConcernFailure) {
516     configTargeter()->setFindHostReturnValue({kTestHosts[0]});
517 
518     BSONObj objToUpdate = BSON("_id" << 1 << "Value"
519                                      << "TestValue");
520     BSONObj updateExpr = BSON("$set" << BSON("Value"
521                                              << "NewTestValue"));
522 
523     auto future = launchAsync([&] {
524         auto status =
525             catalogClient()->updateConfigDocument(operationContext(),
526                                                   kTestNamespace.ns(),
527                                                   objToUpdate,
528                                                   updateExpr,
529                                                   false,
530                                                   ShardingCatalogClient::kMajorityWriteConcern);
531         ASSERT_OK(status);
532     });
533 
534     onCommand([&](const RemoteCommandRequest& request) {
535         const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
536         const auto updateOp = UpdateOp::parse(opMsgRequest);
537         ASSERT_EQUALS(kTestNamespace.ns(), updateOp.getNamespace().ns());
538 
539         BatchedCommandResponse response;
540         response.setOk(true);
541         response.setNModified(1);
542 
543         auto wcError = stdx::make_unique<WriteConcernErrorDetail>();
544 
545         WriteConcernResult wcRes;
546         wcRes.err = "timeout";
547         wcRes.wTimedOut = true;
548 
549         Status wcStatus(ErrorCodes::NetworkTimeout, "Failed to wait for write concern");
550         wcError->setErrCode(wcStatus.code());
551         wcError->setErrMessage(wcStatus.reason());
552         wcError->setErrInfo(BSON("wtimeout" << true));
553 
554         response.setWriteConcernError(wcError.release());
555 
556         return response.toBSON();
557     });
558 
559     onCommand([&](const RemoteCommandRequest& request) {
560         const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
561         const auto updateOp = UpdateOp::parse(opMsgRequest);
562         ASSERT_EQUALS(kTestNamespace.ns(), updateOp.getNamespace().ns());
563 
564         BatchedCommandResponse response;
565         response.setOk(true);
566         response.setNModified(0);
567 
568         return response.toBSON();
569     });
570 
571     future.timed_get(kFutureTimeout);
572 }
573 
574 }  // namespace
575 }  // namespace mongo
576