1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/base/string_data.h"
36 #include "mongo/db/catalog/collection_options.h"
37 #include "mongo/db/client.h"
38 #include "mongo/db/jsobj.h"
39 #include "mongo/db/operation_context.h"
40 #include "mongo/db/repl/replication_consistency_markers.h"
41 #include "mongo/db/repl/replication_process.h"
42 #include "mongo/db/repl/rollback_gen.h"
43 #include "mongo/db/repl/storage_interface.h"
44 #include "mongo/db/service_context.h"
45 #include "mongo/util/assert_util.h"
46 #include "mongo/util/log.h"
47 #include "mongo/util/mongoutils/str.h"
48
49 namespace mongo {
50 namespace repl {
51
52
53 namespace {
54
55 const auto getReplicationProcess =
56 ServiceContext::declareDecoration<std::unique_ptr<ReplicationProcess>>();
57
58 } // namespace
59
get(ServiceContext * service)60 ReplicationProcess* ReplicationProcess::get(ServiceContext* service) {
61 return getReplicationProcess(service).get();
62 }
63
get(ServiceContext & service)64 ReplicationProcess* ReplicationProcess::get(ServiceContext& service) {
65 return getReplicationProcess(service).get();
66 }
67
get(OperationContext * opCtx)68 ReplicationProcess* ReplicationProcess::get(OperationContext* opCtx) {
69 return get(opCtx->getClient()->getServiceContext());
70 }
71
72
set(ServiceContext * service,std::unique_ptr<ReplicationProcess> process)73 void ReplicationProcess::set(ServiceContext* service, std::unique_ptr<ReplicationProcess> process) {
74 auto& replicationProcess = getReplicationProcess(service);
75 replicationProcess = std::move(process);
76 }
77
ReplicationProcess(StorageInterface * storageInterface,std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers,std::unique_ptr<ReplicationRecovery> recovery)78 ReplicationProcess::ReplicationProcess(
79 StorageInterface* storageInterface,
80 std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers,
81 std::unique_ptr<ReplicationRecovery> recovery)
82 : _storageInterface(storageInterface),
83 _consistencyMarkers(std::move(consistencyMarkers)),
84 _recovery(std::move(recovery)),
85 _rbid(kUninitializedRollbackId) {}
86
refreshRollbackID(OperationContext * opCtx)87 Status ReplicationProcess::refreshRollbackID(OperationContext* opCtx) {
88 stdx::lock_guard<stdx::mutex> lock(_mutex);
89
90 auto rbidResult = _storageInterface->getRollbackID(opCtx);
91 if (!rbidResult.isOK()) {
92 return rbidResult.getStatus();
93 }
94
95 if (kUninitializedRollbackId == _rbid) {
96 log() << "Rollback ID is " << rbidResult.getValue();
97 } else {
98 log() << "Rollback ID is " << rbidResult.getValue() << " (previously " << _rbid << ")";
99 }
100 _rbid = rbidResult.getValue();
101
102 return Status::OK();
103 }
104
getRollbackID() const105 int ReplicationProcess::getRollbackID() const {
106 stdx::lock_guard<stdx::mutex> lock(_mutex);
107 if (kUninitializedRollbackId == _rbid) {
108 // This may happen when serverStatus is called by an internal client before we have a chance
109 // to read the rollback ID from storage.
110 warning() << "Rollback ID is not initialized yet.";
111 }
112 return _rbid;
113 }
114
initializeRollbackID(OperationContext * opCtx)115 Status ReplicationProcess::initializeRollbackID(OperationContext* opCtx) {
116 stdx::lock_guard<stdx::mutex> lock(_mutex);
117
118 invariant(kUninitializedRollbackId == _rbid);
119
120 // Do not make any assumptions about the starting value of the rollback ID in the
121 // local.system.rollback.id collection other than it cannot be "kUninitializedRollbackId".
122 // Cache the rollback ID in _rbid to be returned the next time getRollbackID() is called.
123
124 auto initRbidSW = _storageInterface->initializeRollbackID(opCtx);
125 if (initRbidSW.isOK()) {
126 log() << "Initialized the rollback ID to " << initRbidSW.getValue();
127 _rbid = initRbidSW.getValue();
128 invariant(kUninitializedRollbackId != _rbid);
129 } else {
130 warning() << "Failed to initialize the rollback ID: " << initRbidSW.getStatus().reason();
131 }
132 return initRbidSW.getStatus();
133 }
134
incrementRollbackID(OperationContext * opCtx)135 Status ReplicationProcess::incrementRollbackID(OperationContext* opCtx) {
136 stdx::lock_guard<stdx::mutex> lock(_mutex);
137
138 auto status = _storageInterface->incrementRollbackID(opCtx);
139
140 // If the rollback ID was incremented successfully, cache the new value in _rbid to be returned
141 // the next time getRollbackID() is called.
142 if (status.isOK()) {
143 log() << "Incremented the rollback ID to " << status.getValue();
144 _rbid = status.getValue();
145 invariant(kUninitializedRollbackId != _rbid);
146 } else {
147 warning() << "Failed to increment the rollback ID: " << status.getStatus().reason();
148 }
149
150 return status.getStatus();
151 }
152
getConsistencyMarkers()153 ReplicationConsistencyMarkers* ReplicationProcess::getConsistencyMarkers() {
154 return _consistencyMarkers.get();
155 }
156
getReplicationRecovery()157 ReplicationRecovery* ReplicationProcess::getReplicationRecovery() {
158 return _recovery.get();
159 }
160
161 } // namespace repl
162 } // namespace mongo
163