1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/s/catalog/replset_dist_lock_manager.h"
36
37 #include "mongo/base/status.h"
38 #include "mongo/base/status_with.h"
39 #include "mongo/db/operation_context_noop.h"
40 #include "mongo/db/service_context.h"
41 #include "mongo/s/catalog/dist_lock_catalog.h"
42 #include "mongo/s/catalog/type_lockpings.h"
43 #include "mongo/s/catalog/type_locks.h"
44 #include "mongo/s/client/shard_registry.h"
45 #include "mongo/s/grid.h"
46 #include "mongo/stdx/chrono.h"
47 #include "mongo/stdx/memory.h"
48 #include "mongo/util/concurrency/idle_thread_block.h"
49 #include "mongo/util/concurrency/thread_name.h"
50 #include "mongo/util/fail_point_service.h"
51 #include "mongo/util/log.h"
52 #include "mongo/util/mongoutils/str.h"
53 #include "mongo/util/time_support.h"
54 #include "mongo/util/timer.h"
55
56 namespace mongo {
57
58 MONGO_FP_DECLARE(setDistLockTimeout);
59
60 using std::string;
61 using std::unique_ptr;
62
63 namespace {
64
65 // How many times to retry acquiring the lock after the first attempt fails
66 const int kMaxNumLockAcquireRetries = 2;
67
68 // How frequently to poll the distributed lock when it is found to be locked
69 const Milliseconds kLockRetryInterval(500);
70
71 } // namespace
72
73 const Seconds ReplSetDistLockManager::kDistLockPingInterval{30};
74 const Minutes ReplSetDistLockManager::kDistLockExpirationTime{15};
75
ReplSetDistLockManager(ServiceContext * globalContext,StringData processID,unique_ptr<DistLockCatalog> catalog,Milliseconds pingInterval,Milliseconds lockExpiration)76 ReplSetDistLockManager::ReplSetDistLockManager(ServiceContext* globalContext,
77 StringData processID,
78 unique_ptr<DistLockCatalog> catalog,
79 Milliseconds pingInterval,
80 Milliseconds lockExpiration)
81 : _serviceContext(globalContext),
82 _processID(processID.toString()),
83 _catalog(std::move(catalog)),
84 _pingInterval(pingInterval),
85 _lockExpiration(lockExpiration) {}
86
87 ReplSetDistLockManager::~ReplSetDistLockManager() = default;
88
startUp()89 void ReplSetDistLockManager::startUp() {
90 if (!_execThread) {
91 _execThread = stdx::make_unique<stdx::thread>(&ReplSetDistLockManager::doTask, this);
92 }
93 }
94
shutDown(OperationContext * opCtx)95 void ReplSetDistLockManager::shutDown(OperationContext* opCtx) {
96 {
97 stdx::lock_guard<stdx::mutex> lk(_mutex);
98 _isShutDown = true;
99 _shutDownCV.notify_all();
100 }
101
102 // Don't grab _mutex, otherwise will deadlock trying to join. Safe to read
103 // _execThread since it is modified only at statrUp().
104 if (_execThread && _execThread->joinable()) {
105 _execThread->join();
106 _execThread.reset();
107 }
108
109 auto status = _catalog->stopPing(opCtx, _processID);
110 if (!status.isOK()) {
111 warning() << "error encountered while cleaning up distributed ping entry for " << _processID
112 << causedBy(redact(status));
113 }
114 }
115
getProcessID()116 std::string ReplSetDistLockManager::getProcessID() {
117 return _processID;
118 }
119
isShutDown()120 bool ReplSetDistLockManager::isShutDown() {
121 stdx::lock_guard<stdx::mutex> lk(_mutex);
122 return _isShutDown;
123 }
124
doTask()125 void ReplSetDistLockManager::doTask() {
126 LOG(0) << "creating distributed lock ping thread for process " << _processID
127 << " (sleeping for " << _pingInterval << ")";
128
129 Timer elapsedSincelastPing(_serviceContext->getTickSource());
130 Client::initThread("replSetDistLockPinger");
131
132 while (!isShutDown()) {
133 {
134 auto opCtx = cc().makeOperationContext();
135 auto pingStatus = _catalog->ping(opCtx.get(), _processID, Date_t::now());
136
137 if (!pingStatus.isOK() && pingStatus != ErrorCodes::NotMaster) {
138 warning() << "pinging failed for distributed lock pinger" << causedBy(pingStatus);
139 }
140
141 const Milliseconds elapsed(elapsedSincelastPing.millis());
142 if (elapsed > 10 * _pingInterval) {
143 warning() << "Lock pinger for proc: " << _processID << " was inactive for "
144 << elapsed << " ms";
145 }
146 elapsedSincelastPing.reset();
147
148 std::deque<std::pair<DistLockHandle, boost::optional<std::string>>> toUnlockBatch;
149 {
150 stdx::unique_lock<stdx::mutex> lk(_mutex);
151 toUnlockBatch.swap(_unlockList);
152 }
153
154 for (const auto& toUnlock : toUnlockBatch) {
155 std::string nameMessage = "";
156 Status unlockStatus(ErrorCodes::NotYetInitialized,
157 "status unlock not initialized!");
158 if (toUnlock.second) {
159 // A non-empty _id (name) field was provided, unlock by ts (sessionId) and _id.
160 unlockStatus = _catalog->unlock(opCtx.get(), toUnlock.first, *toUnlock.second);
161 nameMessage = " and " + LocksType::name() + ": " + *toUnlock.second;
162 } else {
163 unlockStatus = _catalog->unlock(opCtx.get(), toUnlock.first);
164 }
165
166 if (!unlockStatus.isOK()) {
167 warning() << "Failed to unlock lock with " << LocksType::lockID() << ": "
168 << toUnlock.first << nameMessage << causedBy(unlockStatus);
169 // Queue another attempt, unless the problem was no longer being primary.
170 if (unlockStatus != ErrorCodes::NotMaster) {
171 queueUnlock(toUnlock.first, toUnlock.second);
172 }
173 } else {
174 LOG(0) << "distributed lock with " << LocksType::lockID() << ": "
175 << toUnlock.first << nameMessage << " unlocked.";
176 }
177
178 if (isShutDown()) {
179 return;
180 }
181 }
182 }
183
184 stdx::unique_lock<stdx::mutex> lk(_mutex);
185 MONGO_IDLE_THREAD_BLOCK;
186 _shutDownCV.wait_for(lk, _pingInterval.toSystemDuration(), [this] { return _isShutDown; });
187 }
188 }
189
isLockExpired(OperationContext * opCtx,LocksType lockDoc,const Milliseconds & lockExpiration)190 StatusWith<bool> ReplSetDistLockManager::isLockExpired(OperationContext* opCtx,
191 LocksType lockDoc,
192 const Milliseconds& lockExpiration) {
193 const auto& processID = lockDoc.getProcess();
194 auto pingStatus = _catalog->getPing(opCtx, processID);
195
196 Date_t pingValue;
197 if (pingStatus.isOK()) {
198 const auto& pingDoc = pingStatus.getValue();
199 Status pingDocValidationStatus = pingDoc.validate();
200 if (!pingDocValidationStatus.isOK()) {
201 return {ErrorCodes::UnsupportedFormat,
202 str::stream() << "invalid ping document for " << processID << ": "
203 << pingDocValidationStatus.toString()};
204 }
205
206 pingValue = pingDoc.getPing();
207 } else if (pingStatus.getStatus() != ErrorCodes::NoMatchingDocument) {
208 return pingStatus.getStatus();
209 } // else use default pingValue if ping document does not exist.
210
211 Timer timer(_serviceContext->getTickSource());
212 auto serverInfoStatus = _catalog->getServerInfo(opCtx);
213 if (!serverInfoStatus.isOK()) {
214 if (serverInfoStatus.getStatus() == ErrorCodes::NotMaster) {
215 return false;
216 }
217
218 return serverInfoStatus.getStatus();
219 }
220
221 // Be conservative when determining that lock expiration has elapsed by
222 // taking into account the roundtrip delay of trying to get the local
223 // time from the config server.
224 Milliseconds delay(timer.millis() / 2); // Assuming symmetrical delay.
225
226 const auto& serverInfo = serverInfoStatus.getValue();
227
228 stdx::lock_guard<stdx::mutex> lk(_mutex);
229 auto pingIter = _pingHistory.find(lockDoc.getName());
230
231 if (pingIter == _pingHistory.end()) {
232 // We haven't seen this lock before so we don't have any point of reference
233 // to compare and determine the elapsed time. Save the current ping info
234 // for this lock.
235 _pingHistory.emplace(std::piecewise_construct,
236 std::forward_as_tuple(lockDoc.getName()),
237 std::forward_as_tuple(processID,
238 pingValue,
239 serverInfo.serverTime,
240 lockDoc.getLockID(),
241 serverInfo.electionId));
242 return false;
243 }
244
245 auto configServerLocalTime = serverInfo.serverTime - delay;
246
247 auto* pingInfo = &pingIter->second;
248
249 LOG(1) << "checking last ping for lock '" << lockDoc.getName() << "' against last seen process "
250 << pingInfo->processId << " and ping " << pingInfo->lastPing;
251
252 if (pingInfo->lastPing != pingValue || // ping is active
253
254 // Owner of this lock is now different from last time so we can't
255 // use the ping data.
256 pingInfo->lockSessionId != lockDoc.getLockID() ||
257
258 // Primary changed, we can't trust that clocks are synchronized so
259 // treat as if this is a new entry.
260 pingInfo->electionId != serverInfo.electionId) {
261 pingInfo->lastPing = pingValue;
262 pingInfo->electionId = serverInfo.electionId;
263 pingInfo->configLocalTime = configServerLocalTime;
264 pingInfo->lockSessionId = lockDoc.getLockID();
265 return false;
266 }
267
268 if (configServerLocalTime < pingInfo->configLocalTime) {
269 warning() << "config server local time went backwards, from last seen: "
270 << pingInfo->configLocalTime << " to " << configServerLocalTime;
271 return false;
272 }
273
274 Milliseconds elapsedSinceLastPing(configServerLocalTime - pingInfo->configLocalTime);
275 if (elapsedSinceLastPing >= lockExpiration) {
276 LOG(0) << "forcing lock '" << lockDoc.getName() << "' because elapsed time "
277 << elapsedSinceLastPing << " >= takeover time " << lockExpiration;
278 return true;
279 }
280
281 LOG(1) << "could not force lock '" << lockDoc.getName() << "' because elapsed time "
282 << durationCount<Milliseconds>(elapsedSinceLastPing) << " < takeover time "
283 << durationCount<Milliseconds>(lockExpiration) << " ms";
284 return false;
285 }
286
lockWithSessionID(OperationContext * opCtx,StringData name,StringData whyMessage,const OID & lockSessionID,Milliseconds waitFor)287 StatusWith<DistLockHandle> ReplSetDistLockManager::lockWithSessionID(OperationContext* opCtx,
288 StringData name,
289 StringData whyMessage,
290 const OID& lockSessionID,
291 Milliseconds waitFor) {
292 Timer timer(_serviceContext->getTickSource());
293 Timer msgTimer(_serviceContext->getTickSource());
294
295 // Counts how many attempts have been made to grab the lock, which have failed with network
296 // error. This value is reset for each lock acquisition attempt because these are
297 // independent write operations.
298 int networkErrorRetries = 0;
299
300 auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
301
302 // Distributed lock acquisition works by tring to update the state of the lock to 'taken'. If
303 // the lock is currently taken, we will back off and try the acquisition again, repeating this
304 // until the lockTryInterval has been reached. If a network error occurs at each lock
305 // acquisition attempt, the lock acquisition will be retried immediately.
306 while (waitFor <= Milliseconds::zero() || Milliseconds(timer.millis()) < waitFor) {
307 const string who = str::stream() << _processID << ":" << getThreadName();
308
309 auto lockExpiration = _lockExpiration;
310 MONGO_FAIL_POINT_BLOCK(setDistLockTimeout, customTimeout) {
311 const BSONObj& data = customTimeout.getData();
312 lockExpiration = Milliseconds(data["timeoutMs"].numberInt());
313 }
314
315 LOG(1) << "trying to acquire new distributed lock for " << name
316 << " ( lock timeout : " << durationCount<Milliseconds>(lockExpiration)
317 << " ms, ping interval : " << durationCount<Milliseconds>(_pingInterval)
318 << " ms, process : " << _processID << " )"
319 << " with lockSessionID: " << lockSessionID << ", why: " << whyMessage.toString();
320
321 auto lockResult = _catalog->grabLock(
322 opCtx, name, lockSessionID, who, _processID, Date_t::now(), whyMessage.toString());
323
324 auto status = lockResult.getStatus();
325
326 if (status.isOK()) {
327 // Lock is acquired since findAndModify was able to successfully modify
328 // the lock document.
329 log() << "distributed lock '" << name << "' acquired for '" << whyMessage.toString()
330 << "', ts : " << lockSessionID;
331 return lockSessionID;
332 }
333
334 // If a network error occurred, unlock the lock synchronously and try again
335 if (configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) &&
336 networkErrorRetries < kMaxNumLockAcquireRetries) {
337 LOG(1) << "Failed to acquire distributed lock because of retriable error. Retrying "
338 "acquisition by first unlocking the stale entry, which possibly exists now"
339 << causedBy(redact(status));
340
341 networkErrorRetries++;
342
343 status = _catalog->unlock(opCtx, lockSessionID, name);
344 if (status.isOK()) {
345 // We certainly do not own the lock, so we can retry
346 continue;
347 }
348
349 // Fall-through to the error checking logic below
350 invariant(status != ErrorCodes::LockStateChangeFailed);
351
352 LOG(1)
353 << "Failed to retry acquisition of distributed lock. No more attempts will be made"
354 << causedBy(redact(status));
355 }
356
357 if (status != ErrorCodes::LockStateChangeFailed) {
358 // An error occurred but the write might have actually been applied on the
359 // other side. Schedule an unlock to clean it up just in case.
360 queueUnlock(lockSessionID, name.toString());
361 return status;
362 }
363
364 // Get info from current lock and check if we can overtake it.
365 auto getLockStatusResult = _catalog->getLockByName(opCtx, name);
366 const auto& getLockStatus = getLockStatusResult.getStatus();
367
368 if (!getLockStatusResult.isOK() && getLockStatus != ErrorCodes::LockNotFound) {
369 return getLockStatus;
370 }
371
372 // Note: Only attempt to overtake locks that actually exists. If lock was not
373 // found, use the normal grab lock path to acquire it.
374 if (getLockStatusResult.isOK()) {
375 auto currentLock = getLockStatusResult.getValue();
376 auto isLockExpiredResult = isLockExpired(opCtx, currentLock, lockExpiration);
377
378 if (!isLockExpiredResult.isOK()) {
379 return isLockExpiredResult.getStatus();
380 }
381
382 if (isLockExpiredResult.getValue() || (lockSessionID == currentLock.getLockID())) {
383 auto overtakeResult = _catalog->overtakeLock(opCtx,
384 name,
385 lockSessionID,
386 currentLock.getLockID(),
387 who,
388 _processID,
389 Date_t::now(),
390 whyMessage);
391
392 const auto& overtakeStatus = overtakeResult.getStatus();
393
394 if (overtakeResult.isOK()) {
395 // Lock is acquired since findAndModify was able to successfully modify
396 // the lock document.
397
398 LOG(0) << "lock '" << name << "' successfully forced";
399 LOG(0) << "distributed lock '" << name << "' acquired, ts : " << lockSessionID;
400 return lockSessionID;
401 }
402
403 if (overtakeStatus != ErrorCodes::LockStateChangeFailed) {
404 // An error occurred but the write might have actually been applied on the
405 // other side. Schedule an unlock to clean it up just in case.
406 queueUnlock(lockSessionID, boost::none);
407 return overtakeStatus;
408 }
409 }
410 }
411
412 LOG(1) << "distributed lock '" << name << "' was not acquired.";
413
414 if (waitFor == Milliseconds::zero()) {
415 break;
416 }
417
418 // Periodically message for debugging reasons
419 if (msgTimer.seconds() > 10) {
420 LOG(0) << "waited " << timer.seconds() << "s for distributed lock " << name << " for "
421 << whyMessage.toString();
422
423 msgTimer.reset();
424 }
425
426 // A new lock acquisition attempt will begin now (because the previous found the lock to be
427 // busy, so reset the retries counter)
428 networkErrorRetries = 0;
429
430 const Milliseconds timeRemaining =
431 std::max(Milliseconds::zero(), waitFor - Milliseconds(timer.millis()));
432 sleepFor(std::min(kLockRetryInterval, timeRemaining));
433 }
434
435 return {ErrorCodes::LockBusy, str::stream() << "timed out waiting for " << name};
436 }
437
tryLockWithLocalWriteConcern(OperationContext * opCtx,StringData name,StringData whyMessage,const OID & lockSessionID)438 StatusWith<DistLockHandle> ReplSetDistLockManager::tryLockWithLocalWriteConcern(
439 OperationContext* opCtx, StringData name, StringData whyMessage, const OID& lockSessionID) {
440 const string who = str::stream() << _processID << ":" << getThreadName();
441
442 LOG(1) << "trying to acquire new distributed lock for " << name
443 << " ( lock timeout : " << durationCount<Milliseconds>(_lockExpiration)
444 << " ms, ping interval : " << durationCount<Milliseconds>(_pingInterval)
445 << " ms, process : " << _processID << " )"
446 << " with lockSessionID: " << lockSessionID << ", why: " << whyMessage.toString();
447
448 auto lockStatus = _catalog->grabLock(opCtx,
449 name,
450 lockSessionID,
451 who,
452 _processID,
453 Date_t::now(),
454 whyMessage.toString(),
455 DistLockCatalog::kLocalWriteConcern);
456
457 if (lockStatus.isOK()) {
458 log() << "distributed lock '" << name << "' acquired for '" << whyMessage.toString()
459 << "', ts : " << lockSessionID;
460 return lockSessionID;
461 }
462
463 LOG(1) << "distributed lock '" << name << "' was not acquired.";
464
465 if (lockStatus == ErrorCodes::LockStateChangeFailed) {
466 return {ErrorCodes::LockBusy, str::stream() << "Unable to acquire " << name};
467 }
468
469 return lockStatus.getStatus();
470 }
471
unlock(OperationContext * opCtx,const DistLockHandle & lockSessionID)472 void ReplSetDistLockManager::unlock(OperationContext* opCtx, const DistLockHandle& lockSessionID) {
473 auto unlockStatus = _catalog->unlock(opCtx, lockSessionID);
474
475 if (!unlockStatus.isOK()) {
476 queueUnlock(lockSessionID, boost::none);
477 } else {
478 LOG(0) << "distributed lock with " << LocksType::lockID() << ": " << lockSessionID
479 << "' unlocked.";
480 }
481 }
482
unlock(OperationContext * opCtx,const DistLockHandle & lockSessionID,StringData name)483 void ReplSetDistLockManager::unlock(OperationContext* opCtx,
484 const DistLockHandle& lockSessionID,
485 StringData name) {
486 auto unlockStatus = _catalog->unlock(opCtx, lockSessionID, name);
487
488 if (!unlockStatus.isOK()) {
489 queueUnlock(lockSessionID, name.toString());
490 } else {
491 LOG(0) << "distributed lock with " << LocksType::lockID() << ": '" << lockSessionID
492 << "' and " << LocksType::name() << ": '" << name.toString() << "' unlocked.";
493 }
494 }
495
unlockAll(OperationContext * opCtx,const std::string & processID)496 void ReplSetDistLockManager::unlockAll(OperationContext* opCtx, const std::string& processID) {
497 Status status = _catalog->unlockAll(opCtx, processID);
498 if (!status.isOK()) {
499 warning() << "Error while trying to unlock existing distributed locks"
500 << causedBy(redact(status));
501 }
502 }
503
checkStatus(OperationContext * opCtx,const DistLockHandle & lockHandle)504 Status ReplSetDistLockManager::checkStatus(OperationContext* opCtx,
505 const DistLockHandle& lockHandle) {
506 return _catalog->getLockByTS(opCtx, lockHandle).getStatus();
507 }
508
queueUnlock(const DistLockHandle & lockSessionID,const boost::optional<std::string> & name)509 void ReplSetDistLockManager::queueUnlock(const DistLockHandle& lockSessionID,
510 const boost::optional<std::string>& name) {
511 stdx::unique_lock<stdx::mutex> lk(_mutex);
512 _unlockList.push_back(std::make_pair(lockSessionID, name));
513 }
514
515 } // namespace mongo
516