1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/s/catalog/replset_dist_lock_manager.h"
36 
37 #include "mongo/base/status.h"
38 #include "mongo/base/status_with.h"
39 #include "mongo/db/operation_context_noop.h"
40 #include "mongo/db/service_context.h"
41 #include "mongo/s/catalog/dist_lock_catalog.h"
42 #include "mongo/s/catalog/type_lockpings.h"
43 #include "mongo/s/catalog/type_locks.h"
44 #include "mongo/s/client/shard_registry.h"
45 #include "mongo/s/grid.h"
46 #include "mongo/stdx/chrono.h"
47 #include "mongo/stdx/memory.h"
48 #include "mongo/util/concurrency/idle_thread_block.h"
49 #include "mongo/util/concurrency/thread_name.h"
50 #include "mongo/util/fail_point_service.h"
51 #include "mongo/util/log.h"
52 #include "mongo/util/mongoutils/str.h"
53 #include "mongo/util/time_support.h"
54 #include "mongo/util/timer.h"
55 
56 namespace mongo {
57 
58 MONGO_FP_DECLARE(setDistLockTimeout);
59 
60 using std::string;
61 using std::unique_ptr;
62 
63 namespace {
64 
65 // How many times to retry acquiring the lock after the first attempt fails
66 const int kMaxNumLockAcquireRetries = 2;
67 
68 // How frequently to poll the distributed lock when it is found to be locked
69 const Milliseconds kLockRetryInterval(500);
70 
71 }  // namespace
72 
73 const Seconds ReplSetDistLockManager::kDistLockPingInterval{30};
74 const Minutes ReplSetDistLockManager::kDistLockExpirationTime{15};
75 
ReplSetDistLockManager(ServiceContext * globalContext,StringData processID,unique_ptr<DistLockCatalog> catalog,Milliseconds pingInterval,Milliseconds lockExpiration)76 ReplSetDistLockManager::ReplSetDistLockManager(ServiceContext* globalContext,
77                                                StringData processID,
78                                                unique_ptr<DistLockCatalog> catalog,
79                                                Milliseconds pingInterval,
80                                                Milliseconds lockExpiration)
81     : _serviceContext(globalContext),
82       _processID(processID.toString()),
83       _catalog(std::move(catalog)),
84       _pingInterval(pingInterval),
85       _lockExpiration(lockExpiration) {}
86 
87 ReplSetDistLockManager::~ReplSetDistLockManager() = default;
88 
startUp()89 void ReplSetDistLockManager::startUp() {
90     if (!_execThread) {
91         _execThread = stdx::make_unique<stdx::thread>(&ReplSetDistLockManager::doTask, this);
92     }
93 }
94 
shutDown(OperationContext * opCtx)95 void ReplSetDistLockManager::shutDown(OperationContext* opCtx) {
96     {
97         stdx::lock_guard<stdx::mutex> lk(_mutex);
98         _isShutDown = true;
99         _shutDownCV.notify_all();
100     }
101 
102     // Don't grab _mutex, otherwise will deadlock trying to join. Safe to read
103     // _execThread since it is modified only at statrUp().
104     if (_execThread && _execThread->joinable()) {
105         _execThread->join();
106         _execThread.reset();
107     }
108 
109     auto status = _catalog->stopPing(opCtx, _processID);
110     if (!status.isOK()) {
111         warning() << "error encountered while cleaning up distributed ping entry for " << _processID
112                   << causedBy(redact(status));
113     }
114 }
115 
getProcessID()116 std::string ReplSetDistLockManager::getProcessID() {
117     return _processID;
118 }
119 
isShutDown()120 bool ReplSetDistLockManager::isShutDown() {
121     stdx::lock_guard<stdx::mutex> lk(_mutex);
122     return _isShutDown;
123 }
124 
doTask()125 void ReplSetDistLockManager::doTask() {
126     LOG(0) << "creating distributed lock ping thread for process " << _processID
127            << " (sleeping for " << _pingInterval << ")";
128 
129     Timer elapsedSincelastPing(_serviceContext->getTickSource());
130     Client::initThread("replSetDistLockPinger");
131 
132     while (!isShutDown()) {
133         {
134             auto opCtx = cc().makeOperationContext();
135             auto pingStatus = _catalog->ping(opCtx.get(), _processID, Date_t::now());
136 
137             if (!pingStatus.isOK() && pingStatus != ErrorCodes::NotMaster) {
138                 warning() << "pinging failed for distributed lock pinger" << causedBy(pingStatus);
139             }
140 
141             const Milliseconds elapsed(elapsedSincelastPing.millis());
142             if (elapsed > 10 * _pingInterval) {
143                 warning() << "Lock pinger for proc: " << _processID << " was inactive for "
144                           << elapsed << " ms";
145             }
146             elapsedSincelastPing.reset();
147 
148             std::deque<std::pair<DistLockHandle, boost::optional<std::string>>> toUnlockBatch;
149             {
150                 stdx::unique_lock<stdx::mutex> lk(_mutex);
151                 toUnlockBatch.swap(_unlockList);
152             }
153 
154             for (const auto& toUnlock : toUnlockBatch) {
155                 std::string nameMessage = "";
156                 Status unlockStatus(ErrorCodes::NotYetInitialized,
157                                     "status unlock not initialized!");
158                 if (toUnlock.second) {
159                     // A non-empty _id (name) field was provided, unlock by ts (sessionId) and _id.
160                     unlockStatus = _catalog->unlock(opCtx.get(), toUnlock.first, *toUnlock.second);
161                     nameMessage = " and " + LocksType::name() + ": " + *toUnlock.second;
162                 } else {
163                     unlockStatus = _catalog->unlock(opCtx.get(), toUnlock.first);
164                 }
165 
166                 if (!unlockStatus.isOK()) {
167                     warning() << "Failed to unlock lock with " << LocksType::lockID() << ": "
168                               << toUnlock.first << nameMessage << causedBy(unlockStatus);
169                     // Queue another attempt, unless the problem was no longer being primary.
170                     if (unlockStatus != ErrorCodes::NotMaster) {
171                         queueUnlock(toUnlock.first, toUnlock.second);
172                     }
173                 } else {
174                     LOG(0) << "distributed lock with " << LocksType::lockID() << ": "
175                            << toUnlock.first << nameMessage << " unlocked.";
176                 }
177 
178                 if (isShutDown()) {
179                     return;
180                 }
181             }
182         }
183 
184         stdx::unique_lock<stdx::mutex> lk(_mutex);
185         MONGO_IDLE_THREAD_BLOCK;
186         _shutDownCV.wait_for(lk, _pingInterval.toSystemDuration(), [this] { return _isShutDown; });
187     }
188 }
189 
isLockExpired(OperationContext * opCtx,LocksType lockDoc,const Milliseconds & lockExpiration)190 StatusWith<bool> ReplSetDistLockManager::isLockExpired(OperationContext* opCtx,
191                                                        LocksType lockDoc,
192                                                        const Milliseconds& lockExpiration) {
193     const auto& processID = lockDoc.getProcess();
194     auto pingStatus = _catalog->getPing(opCtx, processID);
195 
196     Date_t pingValue;
197     if (pingStatus.isOK()) {
198         const auto& pingDoc = pingStatus.getValue();
199         Status pingDocValidationStatus = pingDoc.validate();
200         if (!pingDocValidationStatus.isOK()) {
201             return {ErrorCodes::UnsupportedFormat,
202                     str::stream() << "invalid ping document for " << processID << ": "
203                                   << pingDocValidationStatus.toString()};
204         }
205 
206         pingValue = pingDoc.getPing();
207     } else if (pingStatus.getStatus() != ErrorCodes::NoMatchingDocument) {
208         return pingStatus.getStatus();
209     }  // else use default pingValue if ping document does not exist.
210 
211     Timer timer(_serviceContext->getTickSource());
212     auto serverInfoStatus = _catalog->getServerInfo(opCtx);
213     if (!serverInfoStatus.isOK()) {
214         if (serverInfoStatus.getStatus() == ErrorCodes::NotMaster) {
215             return false;
216         }
217 
218         return serverInfoStatus.getStatus();
219     }
220 
221     // Be conservative when determining that lock expiration has elapsed by
222     // taking into account the roundtrip delay of trying to get the local
223     // time from the config server.
224     Milliseconds delay(timer.millis() / 2);  // Assuming symmetrical delay.
225 
226     const auto& serverInfo = serverInfoStatus.getValue();
227 
228     stdx::lock_guard<stdx::mutex> lk(_mutex);
229     auto pingIter = _pingHistory.find(lockDoc.getName());
230 
231     if (pingIter == _pingHistory.end()) {
232         // We haven't seen this lock before so we don't have any point of reference
233         // to compare and determine the elapsed time. Save the current ping info
234         // for this lock.
235         _pingHistory.emplace(std::piecewise_construct,
236                              std::forward_as_tuple(lockDoc.getName()),
237                              std::forward_as_tuple(processID,
238                                                    pingValue,
239                                                    serverInfo.serverTime,
240                                                    lockDoc.getLockID(),
241                                                    serverInfo.electionId));
242         return false;
243     }
244 
245     auto configServerLocalTime = serverInfo.serverTime - delay;
246 
247     auto* pingInfo = &pingIter->second;
248 
249     LOG(1) << "checking last ping for lock '" << lockDoc.getName() << "' against last seen process "
250            << pingInfo->processId << " and ping " << pingInfo->lastPing;
251 
252     if (pingInfo->lastPing != pingValue ||  // ping is active
253 
254         // Owner of this lock is now different from last time so we can't
255         // use the ping data.
256         pingInfo->lockSessionId != lockDoc.getLockID() ||
257 
258         // Primary changed, we can't trust that clocks are synchronized so
259         // treat as if this is a new entry.
260         pingInfo->electionId != serverInfo.electionId) {
261         pingInfo->lastPing = pingValue;
262         pingInfo->electionId = serverInfo.electionId;
263         pingInfo->configLocalTime = configServerLocalTime;
264         pingInfo->lockSessionId = lockDoc.getLockID();
265         return false;
266     }
267 
268     if (configServerLocalTime < pingInfo->configLocalTime) {
269         warning() << "config server local time went backwards, from last seen: "
270                   << pingInfo->configLocalTime << " to " << configServerLocalTime;
271         return false;
272     }
273 
274     Milliseconds elapsedSinceLastPing(configServerLocalTime - pingInfo->configLocalTime);
275     if (elapsedSinceLastPing >= lockExpiration) {
276         LOG(0) << "forcing lock '" << lockDoc.getName() << "' because elapsed time "
277                << elapsedSinceLastPing << " >= takeover time " << lockExpiration;
278         return true;
279     }
280 
281     LOG(1) << "could not force lock '" << lockDoc.getName() << "' because elapsed time "
282            << durationCount<Milliseconds>(elapsedSinceLastPing) << " < takeover time "
283            << durationCount<Milliseconds>(lockExpiration) << " ms";
284     return false;
285 }
286 
lockWithSessionID(OperationContext * opCtx,StringData name,StringData whyMessage,const OID & lockSessionID,Milliseconds waitFor)287 StatusWith<DistLockHandle> ReplSetDistLockManager::lockWithSessionID(OperationContext* opCtx,
288                                                                      StringData name,
289                                                                      StringData whyMessage,
290                                                                      const OID& lockSessionID,
291                                                                      Milliseconds waitFor) {
292     Timer timer(_serviceContext->getTickSource());
293     Timer msgTimer(_serviceContext->getTickSource());
294 
295     // Counts how many attempts have been made to grab the lock, which have failed with network
296     // error. This value is reset for each lock acquisition attempt because these are
297     // independent write operations.
298     int networkErrorRetries = 0;
299 
300     auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
301 
302     // Distributed lock acquisition works by tring to update the state of the lock to 'taken'. If
303     // the lock is currently taken, we will back off and try the acquisition again, repeating this
304     // until the lockTryInterval has been reached. If a network error occurs at each lock
305     // acquisition attempt, the lock acquisition will be retried immediately.
306     while (waitFor <= Milliseconds::zero() || Milliseconds(timer.millis()) < waitFor) {
307         const string who = str::stream() << _processID << ":" << getThreadName();
308 
309         auto lockExpiration = _lockExpiration;
310         MONGO_FAIL_POINT_BLOCK(setDistLockTimeout, customTimeout) {
311             const BSONObj& data = customTimeout.getData();
312             lockExpiration = Milliseconds(data["timeoutMs"].numberInt());
313         }
314 
315         LOG(1) << "trying to acquire new distributed lock for " << name
316                << " ( lock timeout : " << durationCount<Milliseconds>(lockExpiration)
317                << " ms, ping interval : " << durationCount<Milliseconds>(_pingInterval)
318                << " ms, process : " << _processID << " )"
319                << " with lockSessionID: " << lockSessionID << ", why: " << whyMessage.toString();
320 
321         auto lockResult = _catalog->grabLock(
322             opCtx, name, lockSessionID, who, _processID, Date_t::now(), whyMessage.toString());
323 
324         auto status = lockResult.getStatus();
325 
326         if (status.isOK()) {
327             // Lock is acquired since findAndModify was able to successfully modify
328             // the lock document.
329             log() << "distributed lock '" << name << "' acquired for '" << whyMessage.toString()
330                   << "', ts : " << lockSessionID;
331             return lockSessionID;
332         }
333 
334         // If a network error occurred, unlock the lock synchronously and try again
335         if (configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) &&
336             networkErrorRetries < kMaxNumLockAcquireRetries) {
337             LOG(1) << "Failed to acquire distributed lock because of retriable error. Retrying "
338                       "acquisition by first unlocking the stale entry, which possibly exists now"
339                    << causedBy(redact(status));
340 
341             networkErrorRetries++;
342 
343             status = _catalog->unlock(opCtx, lockSessionID, name);
344             if (status.isOK()) {
345                 // We certainly do not own the lock, so we can retry
346                 continue;
347             }
348 
349             // Fall-through to the error checking logic below
350             invariant(status != ErrorCodes::LockStateChangeFailed);
351 
352             LOG(1)
353                 << "Failed to retry acquisition of distributed lock. No more attempts will be made"
354                 << causedBy(redact(status));
355         }
356 
357         if (status != ErrorCodes::LockStateChangeFailed) {
358             // An error occurred but the write might have actually been applied on the
359             // other side. Schedule an unlock to clean it up just in case.
360             queueUnlock(lockSessionID, name.toString());
361             return status;
362         }
363 
364         // Get info from current lock and check if we can overtake it.
365         auto getLockStatusResult = _catalog->getLockByName(opCtx, name);
366         const auto& getLockStatus = getLockStatusResult.getStatus();
367 
368         if (!getLockStatusResult.isOK() && getLockStatus != ErrorCodes::LockNotFound) {
369             return getLockStatus;
370         }
371 
372         // Note: Only attempt to overtake locks that actually exists. If lock was not
373         // found, use the normal grab lock path to acquire it.
374         if (getLockStatusResult.isOK()) {
375             auto currentLock = getLockStatusResult.getValue();
376             auto isLockExpiredResult = isLockExpired(opCtx, currentLock, lockExpiration);
377 
378             if (!isLockExpiredResult.isOK()) {
379                 return isLockExpiredResult.getStatus();
380             }
381 
382             if (isLockExpiredResult.getValue() || (lockSessionID == currentLock.getLockID())) {
383                 auto overtakeResult = _catalog->overtakeLock(opCtx,
384                                                              name,
385                                                              lockSessionID,
386                                                              currentLock.getLockID(),
387                                                              who,
388                                                              _processID,
389                                                              Date_t::now(),
390                                                              whyMessage);
391 
392                 const auto& overtakeStatus = overtakeResult.getStatus();
393 
394                 if (overtakeResult.isOK()) {
395                     // Lock is acquired since findAndModify was able to successfully modify
396                     // the lock document.
397 
398                     LOG(0) << "lock '" << name << "' successfully forced";
399                     LOG(0) << "distributed lock '" << name << "' acquired, ts : " << lockSessionID;
400                     return lockSessionID;
401                 }
402 
403                 if (overtakeStatus != ErrorCodes::LockStateChangeFailed) {
404                     // An error occurred but the write might have actually been applied on the
405                     // other side. Schedule an unlock to clean it up just in case.
406                     queueUnlock(lockSessionID, boost::none);
407                     return overtakeStatus;
408                 }
409             }
410         }
411 
412         LOG(1) << "distributed lock '" << name << "' was not acquired.";
413 
414         if (waitFor == Milliseconds::zero()) {
415             break;
416         }
417 
418         // Periodically message for debugging reasons
419         if (msgTimer.seconds() > 10) {
420             LOG(0) << "waited " << timer.seconds() << "s for distributed lock " << name << " for "
421                    << whyMessage.toString();
422 
423             msgTimer.reset();
424         }
425 
426         // A new lock acquisition attempt will begin now (because the previous found the lock to be
427         // busy, so reset the retries counter)
428         networkErrorRetries = 0;
429 
430         const Milliseconds timeRemaining =
431             std::max(Milliseconds::zero(), waitFor - Milliseconds(timer.millis()));
432         sleepFor(std::min(kLockRetryInterval, timeRemaining));
433     }
434 
435     return {ErrorCodes::LockBusy, str::stream() << "timed out waiting for " << name};
436 }
437 
tryLockWithLocalWriteConcern(OperationContext * opCtx,StringData name,StringData whyMessage,const OID & lockSessionID)438 StatusWith<DistLockHandle> ReplSetDistLockManager::tryLockWithLocalWriteConcern(
439     OperationContext* opCtx, StringData name, StringData whyMessage, const OID& lockSessionID) {
440     const string who = str::stream() << _processID << ":" << getThreadName();
441 
442     LOG(1) << "trying to acquire new distributed lock for " << name
443            << " ( lock timeout : " << durationCount<Milliseconds>(_lockExpiration)
444            << " ms, ping interval : " << durationCount<Milliseconds>(_pingInterval)
445            << " ms, process : " << _processID << " )"
446            << " with lockSessionID: " << lockSessionID << ", why: " << whyMessage.toString();
447 
448     auto lockStatus = _catalog->grabLock(opCtx,
449                                          name,
450                                          lockSessionID,
451                                          who,
452                                          _processID,
453                                          Date_t::now(),
454                                          whyMessage.toString(),
455                                          DistLockCatalog::kLocalWriteConcern);
456 
457     if (lockStatus.isOK()) {
458         log() << "distributed lock '" << name << "' acquired for '" << whyMessage.toString()
459               << "', ts : " << lockSessionID;
460         return lockSessionID;
461     }
462 
463     LOG(1) << "distributed lock '" << name << "' was not acquired.";
464 
465     if (lockStatus == ErrorCodes::LockStateChangeFailed) {
466         return {ErrorCodes::LockBusy, str::stream() << "Unable to acquire " << name};
467     }
468 
469     return lockStatus.getStatus();
470 }
471 
unlock(OperationContext * opCtx,const DistLockHandle & lockSessionID)472 void ReplSetDistLockManager::unlock(OperationContext* opCtx, const DistLockHandle& lockSessionID) {
473     auto unlockStatus = _catalog->unlock(opCtx, lockSessionID);
474 
475     if (!unlockStatus.isOK()) {
476         queueUnlock(lockSessionID, boost::none);
477     } else {
478         LOG(0) << "distributed lock with " << LocksType::lockID() << ": " << lockSessionID
479                << "' unlocked.";
480     }
481 }
482 
unlock(OperationContext * opCtx,const DistLockHandle & lockSessionID,StringData name)483 void ReplSetDistLockManager::unlock(OperationContext* opCtx,
484                                     const DistLockHandle& lockSessionID,
485                                     StringData name) {
486     auto unlockStatus = _catalog->unlock(opCtx, lockSessionID, name);
487 
488     if (!unlockStatus.isOK()) {
489         queueUnlock(lockSessionID, name.toString());
490     } else {
491         LOG(0) << "distributed lock with " << LocksType::lockID() << ": '" << lockSessionID
492                << "' and " << LocksType::name() << ": '" << name.toString() << "' unlocked.";
493     }
494 }
495 
unlockAll(OperationContext * opCtx,const std::string & processID)496 void ReplSetDistLockManager::unlockAll(OperationContext* opCtx, const std::string& processID) {
497     Status status = _catalog->unlockAll(opCtx, processID);
498     if (!status.isOK()) {
499         warning() << "Error while trying to unlock existing distributed locks"
500                   << causedBy(redact(status));
501     }
502 }
503 
checkStatus(OperationContext * opCtx,const DistLockHandle & lockHandle)504 Status ReplSetDistLockManager::checkStatus(OperationContext* opCtx,
505                                            const DistLockHandle& lockHandle) {
506     return _catalog->getLockByTS(opCtx, lockHandle).getStatus();
507 }
508 
queueUnlock(const DistLockHandle & lockSessionID,const boost::optional<std::string> & name)509 void ReplSetDistLockManager::queueUnlock(const DistLockHandle& lockSessionID,
510                                          const boost::optional<std::string>& name) {
511     stdx::unique_lock<stdx::mutex> lk(_mutex);
512     _unlockList.push_back(std::make_pair(lockSessionID, name));
513 }
514 
515 }  // namespace mongo
516