1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/write_concern.h"
36
37 #include "mongo/base/counter.h"
38 #include "mongo/bson/util/bson_extract.h"
39 #include "mongo/db/client.h"
40 #include "mongo/db/commands/server_status_metric.h"
41 #include "mongo/db/operation_context.h"
42 #include "mongo/db/repl/optime.h"
43 #include "mongo/db/repl/replication_coordinator_global.h"
44 #include "mongo/db/server_options.h"
45 #include "mongo/db/service_context.h"
46 #include "mongo/db/stats/timer_stats.h"
47 #include "mongo/db/storage/storage_engine.h"
48 #include "mongo/db/write_concern_options.h"
49 #include "mongo/rpc/protocol.h"
50 #include "mongo/util/fail_point_service.h"
51 #include "mongo/util/log.h"
52
53 namespace mongo {
54
55 using std::string;
56 using repl::OpTime;
57
58 static TimerStats gleWtimeStats;
59 static ServerStatusMetricField<TimerStats> displayGleLatency("getLastError.wtime", &gleWtimeStats);
60
61 static Counter64 gleWtimeouts;
62 static ServerStatusMetricField<Counter64> gleWtimeoutsDisplay("getLastError.wtimeouts",
63 &gleWtimeouts);
64
65 MONGO_FP_DECLARE(hangBeforeWaitingForWriteConcern);
66
commandSpecifiesWriteConcern(const BSONObj & cmdObj)67 bool commandSpecifiesWriteConcern(const BSONObj& cmdObj) {
68 return cmdObj.hasField(WriteConcernOptions::kWriteConcernField);
69 }
70
extractWriteConcern(OperationContext * opCtx,const BSONObj & cmdObj,const std::string & dbName)71 StatusWith<WriteConcernOptions> extractWriteConcern(OperationContext* opCtx,
72 const BSONObj& cmdObj,
73 const std::string& dbName) {
74 // The default write concern if empty is {w:1}. Specifying {w:0} is/was allowed, but is
75 // interpreted identically to {w:1}.
76 auto wcResult = WriteConcernOptions::extractWCFromCommand(
77 cmdObj, dbName, repl::ReplicationCoordinator::get(opCtx)->getGetLastErrorDefault());
78 if (!wcResult.isOK()) {
79 return wcResult.getStatus();
80 }
81
82 WriteConcernOptions writeConcern = wcResult.getValue();
83
84 if (writeConcern.usedDefault) {
85 if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer &&
86 !opCtx->getClient()->isInDirectClient()) {
87 // This is here only for backwards compatibility with 3.2 clusters which have commands
88 // that do not specify write concern when writing to the config server.
89 writeConcern = {
90 WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(30)};
91 }
92 } else {
93 Status wcStatus = validateWriteConcern(opCtx, writeConcern, dbName);
94 if (!wcStatus.isOK()) {
95 return wcStatus;
96 }
97 }
98
99 return writeConcern;
100 }
101
validateWriteConcern(OperationContext * opCtx,const WriteConcernOptions & writeConcern,StringData dbName)102 Status validateWriteConcern(OperationContext* opCtx,
103 const WriteConcernOptions& writeConcern,
104 StringData dbName) {
105 if (writeConcern.syncMode == WriteConcernOptions::SyncMode::JOURNAL &&
106 !opCtx->getServiceContext()->getGlobalStorageEngine()->isDurable()) {
107 return Status(ErrorCodes::BadValue,
108 "cannot use 'j' option when a host does not have journaling enabled");
109 }
110
111 const auto replMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode();
112
113 if (replMode == repl::ReplicationCoordinator::modeNone && writeConcern.wNumNodes > 1) {
114 return Status(ErrorCodes::BadValue, "cannot use 'w' > 1 when a host is not replicated");
115 }
116
117 if (replMode != repl::ReplicationCoordinator::modeReplSet && !writeConcern.wMode.empty() &&
118 writeConcern.wMode != WriteConcernOptions::kMajority) {
119 return Status(ErrorCodes::BadValue,
120 string("cannot use non-majority 'w' mode ") + writeConcern.wMode +
121 " when a host is not a member of a replica set");
122 }
123
124 return Status::OK();
125 }
126
appendTo(const WriteConcernOptions & writeConcern,BSONObjBuilder * result) const127 void WriteConcernResult::appendTo(const WriteConcernOptions& writeConcern,
128 BSONObjBuilder* result) const {
129 if (syncMillis >= 0)
130 result->appendNumber("syncMillis", syncMillis);
131
132 if (fsyncFiles >= 0)
133 result->appendNumber("fsyncFiles", fsyncFiles);
134
135 if (wTime >= 0) {
136 if (wTimedOut)
137 result->appendNumber("waited", wTime);
138 else
139 result->appendNumber("wtime", wTime);
140 }
141
142 if (wTimedOut)
143 result->appendBool("wtimeout", true);
144
145 if (writtenTo.size()) {
146 BSONArrayBuilder hosts(result->subarrayStart("writtenTo"));
147 for (size_t i = 0; i < writtenTo.size(); ++i) {
148 hosts.append(writtenTo[i].toString());
149 }
150 } else {
151 result->appendNull("writtenTo");
152 }
153
154 if (err.empty())
155 result->appendNull("err");
156 else
157 result->append("err", err);
158
159 // For ephemeral storage engines, 0 files may be fsynced
160 invariant(writeConcern.syncMode != WriteConcernOptions::SyncMode::FSYNC ||
161 (result->asTempObj()["fsyncFiles"].numberLong() >= 0 ||
162 !result->asTempObj()["waited"].eoo()));
163 }
164
waitForWriteConcern(OperationContext * opCtx,const OpTime & replOpTime,const WriteConcernOptions & writeConcern,WriteConcernResult * result)165 Status waitForWriteConcern(OperationContext* opCtx,
166 const OpTime& replOpTime,
167 const WriteConcernOptions& writeConcern,
168 WriteConcernResult* result) {
169 LOG(2) << "Waiting for write concern. OpTime: " << replOpTime
170 << ", write concern: " << writeConcern.toBSON();
171
172 auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
173
174 if (!opCtx->getClient()->isInDirectClient()) {
175 // Respecting this failpoint for internal clients prevents stepup from working properly.
176 MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForWriteConcern);
177 }
178
179 // Next handle blocking on disk
180 Timer syncTimer;
181 WriteConcernOptions writeConcernWithPopulatedSyncMode =
182 replCoord->populateUnsetWriteConcernOptionsSyncMode(writeConcern);
183
184 switch (writeConcernWithPopulatedSyncMode.syncMode) {
185 case WriteConcernOptions::SyncMode::UNSET:
186 severe() << "Attempting to wait on a WriteConcern with an unset sync option";
187 fassertFailed(34410);
188 case WriteConcernOptions::SyncMode::NONE:
189 break;
190 case WriteConcernOptions::SyncMode::FSYNC: {
191 StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
192 if (!storageEngine->isDurable()) {
193 result->fsyncFiles = storageEngine->flushAllFiles(opCtx, true);
194 } else {
195 // We only need to commit the journal if we're durable
196 opCtx->recoveryUnit()->waitUntilDurable();
197 }
198 break;
199 }
200 case WriteConcernOptions::SyncMode::JOURNAL:
201 if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::Mode::modeNone) {
202 // Wait for ops to become durable then update replication system's
203 // knowledge of this.
204 OpTime appliedOpTime = replCoord->getMyLastAppliedOpTime();
205 opCtx->recoveryUnit()->waitUntilDurable();
206 replCoord->setMyLastDurableOpTimeForward(appliedOpTime);
207 } else {
208 opCtx->recoveryUnit()->waitUntilDurable();
209 }
210 break;
211 }
212
213 result->syncMillis = syncTimer.millis();
214
215 // Now wait for replication
216
217 if (replOpTime.isNull()) {
218 // no write happened for this client yet
219 return Status::OK();
220 }
221
222 // needed to avoid incrementing gleWtimeStats SERVER-9005
223 if (writeConcernWithPopulatedSyncMode.wNumNodes <= 1 &&
224 writeConcernWithPopulatedSyncMode.wMode.empty()) {
225 // no desired replication check
226 return Status::OK();
227 }
228
229 // Replica set stepdowns and gle mode changes are thrown as errors
230 repl::ReplicationCoordinator::StatusAndDuration replStatus =
231 replCoord->awaitReplication(opCtx, replOpTime, writeConcernWithPopulatedSyncMode);
232 if (replStatus.status == ErrorCodes::WriteConcernFailed) {
233 gleWtimeouts.increment();
234 result->err = "timeout";
235 result->wTimedOut = true;
236 }
237
238 // Add stats
239 result->writtenTo = replCoord->getHostsWrittenTo(replOpTime,
240 writeConcernWithPopulatedSyncMode.syncMode ==
241 WriteConcernOptions::SyncMode::JOURNAL);
242 gleWtimeStats.recordMillis(durationCount<Milliseconds>(replStatus.duration));
243 result->wTime = durationCount<Milliseconds>(replStatus.duration);
244
245 return replStatus.status;
246 }
247
248 } // namespace mongo
249