1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/write_concern.h"
36 
37 #include "mongo/base/counter.h"
38 #include "mongo/bson/util/bson_extract.h"
39 #include "mongo/db/client.h"
40 #include "mongo/db/commands/server_status_metric.h"
41 #include "mongo/db/operation_context.h"
42 #include "mongo/db/repl/optime.h"
43 #include "mongo/db/repl/replication_coordinator_global.h"
44 #include "mongo/db/server_options.h"
45 #include "mongo/db/service_context.h"
46 #include "mongo/db/stats/timer_stats.h"
47 #include "mongo/db/storage/storage_engine.h"
48 #include "mongo/db/write_concern_options.h"
49 #include "mongo/rpc/protocol.h"
50 #include "mongo/util/fail_point_service.h"
51 #include "mongo/util/log.h"
52 
53 namespace mongo {
54 
55 using std::string;
56 using repl::OpTime;
57 
58 static TimerStats gleWtimeStats;
59 static ServerStatusMetricField<TimerStats> displayGleLatency("getLastError.wtime", &gleWtimeStats);
60 
61 static Counter64 gleWtimeouts;
62 static ServerStatusMetricField<Counter64> gleWtimeoutsDisplay("getLastError.wtimeouts",
63                                                               &gleWtimeouts);
64 
65 MONGO_FP_DECLARE(hangBeforeWaitingForWriteConcern);
66 
commandSpecifiesWriteConcern(const BSONObj & cmdObj)67 bool commandSpecifiesWriteConcern(const BSONObj& cmdObj) {
68     return cmdObj.hasField(WriteConcernOptions::kWriteConcernField);
69 }
70 
extractWriteConcern(OperationContext * opCtx,const BSONObj & cmdObj,const std::string & dbName)71 StatusWith<WriteConcernOptions> extractWriteConcern(OperationContext* opCtx,
72                                                     const BSONObj& cmdObj,
73                                                     const std::string& dbName) {
74     // The default write concern if empty is {w:1}. Specifying {w:0} is/was allowed, but is
75     // interpreted identically to {w:1}.
76     auto wcResult = WriteConcernOptions::extractWCFromCommand(
77         cmdObj, dbName, repl::ReplicationCoordinator::get(opCtx)->getGetLastErrorDefault());
78     if (!wcResult.isOK()) {
79         return wcResult.getStatus();
80     }
81 
82     WriteConcernOptions writeConcern = wcResult.getValue();
83 
84     if (writeConcern.usedDefault) {
85         if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer &&
86             !opCtx->getClient()->isInDirectClient()) {
87             // This is here only for backwards compatibility with 3.2 clusters which have commands
88             // that do not specify write concern when writing to the config server.
89             writeConcern = {
90                 WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(30)};
91         }
92     } else {
93         Status wcStatus = validateWriteConcern(opCtx, writeConcern, dbName);
94         if (!wcStatus.isOK()) {
95             return wcStatus;
96         }
97     }
98 
99     return writeConcern;
100 }
101 
validateWriteConcern(OperationContext * opCtx,const WriteConcernOptions & writeConcern,StringData dbName)102 Status validateWriteConcern(OperationContext* opCtx,
103                             const WriteConcernOptions& writeConcern,
104                             StringData dbName) {
105     if (writeConcern.syncMode == WriteConcernOptions::SyncMode::JOURNAL &&
106         !opCtx->getServiceContext()->getGlobalStorageEngine()->isDurable()) {
107         return Status(ErrorCodes::BadValue,
108                       "cannot use 'j' option when a host does not have journaling enabled");
109     }
110 
111     const auto replMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode();
112 
113     if (replMode == repl::ReplicationCoordinator::modeNone && writeConcern.wNumNodes > 1) {
114         return Status(ErrorCodes::BadValue, "cannot use 'w' > 1 when a host is not replicated");
115     }
116 
117     if (replMode != repl::ReplicationCoordinator::modeReplSet && !writeConcern.wMode.empty() &&
118         writeConcern.wMode != WriteConcernOptions::kMajority) {
119         return Status(ErrorCodes::BadValue,
120                       string("cannot use non-majority 'w' mode ") + writeConcern.wMode +
121                           " when a host is not a member of a replica set");
122     }
123 
124     return Status::OK();
125 }
126 
appendTo(const WriteConcernOptions & writeConcern,BSONObjBuilder * result) const127 void WriteConcernResult::appendTo(const WriteConcernOptions& writeConcern,
128                                   BSONObjBuilder* result) const {
129     if (syncMillis >= 0)
130         result->appendNumber("syncMillis", syncMillis);
131 
132     if (fsyncFiles >= 0)
133         result->appendNumber("fsyncFiles", fsyncFiles);
134 
135     if (wTime >= 0) {
136         if (wTimedOut)
137             result->appendNumber("waited", wTime);
138         else
139             result->appendNumber("wtime", wTime);
140     }
141 
142     if (wTimedOut)
143         result->appendBool("wtimeout", true);
144 
145     if (writtenTo.size()) {
146         BSONArrayBuilder hosts(result->subarrayStart("writtenTo"));
147         for (size_t i = 0; i < writtenTo.size(); ++i) {
148             hosts.append(writtenTo[i].toString());
149         }
150     } else {
151         result->appendNull("writtenTo");
152     }
153 
154     if (err.empty())
155         result->appendNull("err");
156     else
157         result->append("err", err);
158 
159     // For ephemeral storage engines, 0 files may be fsynced
160     invariant(writeConcern.syncMode != WriteConcernOptions::SyncMode::FSYNC ||
161               (result->asTempObj()["fsyncFiles"].numberLong() >= 0 ||
162                !result->asTempObj()["waited"].eoo()));
163 }
164 
waitForWriteConcern(OperationContext * opCtx,const OpTime & replOpTime,const WriteConcernOptions & writeConcern,WriteConcernResult * result)165 Status waitForWriteConcern(OperationContext* opCtx,
166                            const OpTime& replOpTime,
167                            const WriteConcernOptions& writeConcern,
168                            WriteConcernResult* result) {
169     LOG(2) << "Waiting for write concern. OpTime: " << replOpTime
170            << ", write concern: " << writeConcern.toBSON();
171 
172     auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
173 
174     if (!opCtx->getClient()->isInDirectClient()) {
175         // Respecting this failpoint for internal clients prevents stepup from working properly.
176         MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForWriteConcern);
177     }
178 
179     // Next handle blocking on disk
180     Timer syncTimer;
181     WriteConcernOptions writeConcernWithPopulatedSyncMode =
182         replCoord->populateUnsetWriteConcernOptionsSyncMode(writeConcern);
183 
184     switch (writeConcernWithPopulatedSyncMode.syncMode) {
185         case WriteConcernOptions::SyncMode::UNSET:
186             severe() << "Attempting to wait on a WriteConcern with an unset sync option";
187             fassertFailed(34410);
188         case WriteConcernOptions::SyncMode::NONE:
189             break;
190         case WriteConcernOptions::SyncMode::FSYNC: {
191             StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
192             if (!storageEngine->isDurable()) {
193                 result->fsyncFiles = storageEngine->flushAllFiles(opCtx, true);
194             } else {
195                 // We only need to commit the journal if we're durable
196                 opCtx->recoveryUnit()->waitUntilDurable();
197             }
198             break;
199         }
200         case WriteConcernOptions::SyncMode::JOURNAL:
201             if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::Mode::modeNone) {
202                 // Wait for ops to become durable then update replication system's
203                 // knowledge of this.
204                 OpTime appliedOpTime = replCoord->getMyLastAppliedOpTime();
205                 opCtx->recoveryUnit()->waitUntilDurable();
206                 replCoord->setMyLastDurableOpTimeForward(appliedOpTime);
207             } else {
208                 opCtx->recoveryUnit()->waitUntilDurable();
209             }
210             break;
211     }
212 
213     result->syncMillis = syncTimer.millis();
214 
215     // Now wait for replication
216 
217     if (replOpTime.isNull()) {
218         // no write happened for this client yet
219         return Status::OK();
220     }
221 
222     // needed to avoid incrementing gleWtimeStats SERVER-9005
223     if (writeConcernWithPopulatedSyncMode.wNumNodes <= 1 &&
224         writeConcernWithPopulatedSyncMode.wMode.empty()) {
225         // no desired replication check
226         return Status::OK();
227     }
228 
229     // Replica set stepdowns and gle mode changes are thrown as errors
230     repl::ReplicationCoordinator::StatusAndDuration replStatus =
231         replCoord->awaitReplication(opCtx, replOpTime, writeConcernWithPopulatedSyncMode);
232     if (replStatus.status == ErrorCodes::WriteConcernFailed) {
233         gleWtimeouts.increment();
234         result->err = "timeout";
235         result->wTimedOut = true;
236     }
237 
238     // Add stats
239     result->writtenTo = replCoord->getHostsWrittenTo(replOpTime,
240                                                      writeConcernWithPopulatedSyncMode.syncMode ==
241                                                          WriteConcernOptions::SyncMode::JOURNAL);
242     gleWtimeStats.recordMillis(durationCount<Milliseconds>(replStatus.duration));
243     result->wTime = durationCount<Milliseconds>(replStatus.duration);
244 
245     return replStatus.status;
246 }
247 
248 }  // namespace mongo
249