1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/repl/sync_source_feedback.h"
36
37 #include "mongo/db/client.h"
38 #include "mongo/db/repl/bgsync.h"
39 #include "mongo/db/repl/repl_set_config.h"
40 #include "mongo/db/repl/replication_coordinator.h"
41 #include "mongo/db/repl/reporter.h"
42 #include "mongo/executor/task_executor.h"
43 #include "mongo/util/concurrency/idle_thread_block.h"
44 #include "mongo/util/log.h"
45 #include "mongo/util/net/hostandport.h"
46 #include "mongo/util/scopeguard.h"
47 #include "mongo/util/time_support.h"
48
49 namespace mongo {
50 namespace repl {
51
52 namespace {
53
54
55 const Milliseconds maximumKeepAliveIntervalMS(30 * 1000);
56
57 // The network timeout used for replSetUpdatePosition requests made to a node's sync source.
58 const Seconds syncSourceFeedbackNetworkTimeoutSecs(30);
59
60 /**
61 * Calculates the keep alive interval based on the given ReplSetConfig.
62 */
calculateKeepAliveInterval(const ReplSetConfig & rsConfig)63 Milliseconds calculateKeepAliveInterval(const ReplSetConfig& rsConfig) {
64 return std::min((rsConfig.getElectionTimeoutPeriod() / 2), maximumKeepAliveIntervalMS);
65 }
66
67 /**
68 * Returns function to prepare update command
69 */
makePrepareReplSetUpdatePositionCommandFn(ReplicationCoordinator * replCoord,const HostAndPort & syncTarget,BackgroundSync * bgsync)70 Reporter::PrepareReplSetUpdatePositionCommandFn makePrepareReplSetUpdatePositionCommandFn(
71 ReplicationCoordinator* replCoord, const HostAndPort& syncTarget, BackgroundSync* bgsync) {
72 return [syncTarget, replCoord, bgsync]() -> StatusWith<BSONObj> {
73 auto currentSyncTarget = bgsync->getSyncTarget();
74 if (currentSyncTarget != syncTarget) {
75 if (currentSyncTarget.empty()) {
76 // Sync source was cleared.
77 return Status(ErrorCodes::InvalidSyncSource,
78 str::stream() << "Sync source was cleared. Was " << syncTarget);
79
80 } else {
81 // Sync source changed.
82 return Status(ErrorCodes::InvalidSyncSource,
83 str::stream() << "Sync source changed from " << syncTarget << " to "
84 << currentSyncTarget);
85 }
86 }
87
88 if (replCoord->getMemberState().primary()) {
89 // Primary has no one to send updates to.
90 return Status(ErrorCodes::InvalidSyncSource,
91 "Currently primary - no one to send updates to");
92 }
93
94 return replCoord->prepareReplSetUpdatePositionCommand();
95 };
96 }
97
98 } // namespace
99
forwardSlaveProgress()100 void SyncSourceFeedback::forwardSlaveProgress() {
101 {
102 stdx::unique_lock<stdx::mutex> lock(_mtx);
103 _positionChanged = true;
104 _cond.notify_all();
105 if (_reporter) {
106 auto triggerStatus = _reporter->trigger();
107 if (!triggerStatus.isOK()) {
108 warning() << "unable to forward slave progress to " << _reporter->getTarget()
109 << ": " << triggerStatus;
110 }
111 }
112 }
113 }
114
_updateUpstream(Reporter * reporter)115 Status SyncSourceFeedback::_updateUpstream(Reporter* reporter) {
116 auto syncTarget = reporter->getTarget();
117
118 auto triggerStatus = reporter->trigger();
119 if (!triggerStatus.isOK()) {
120 warning() << "unable to schedule reporter to update replication progress on " << syncTarget
121 << ": " << triggerStatus;
122 return triggerStatus;
123 }
124
125 auto status = reporter->join();
126
127 if (!status.isOK()) {
128 log() << "SyncSourceFeedback error sending update to " << syncTarget << ": " << status;
129 }
130
131 // Sync source blacklisting will be done in BackgroundSync and SyncSourceResolver.
132
133 return status;
134 }
135
shutdown()136 void SyncSourceFeedback::shutdown() {
137 stdx::unique_lock<stdx::mutex> lock(_mtx);
138 if (_reporter) {
139 _reporter->shutdown();
140 }
141 _shutdownSignaled = true;
142 _cond.notify_all();
143 }
144
run(executor::TaskExecutor * executor,BackgroundSync * bgsync,ReplicationCoordinator * replCoord)145 void SyncSourceFeedback::run(executor::TaskExecutor* executor,
146 BackgroundSync* bgsync,
147 ReplicationCoordinator* replCoord) {
148 Client::initThread("SyncSourceFeedback");
149
150 HostAndPort syncTarget;
151
152 // keepAliveInterval indicates how frequently to forward progress in the absence of updates.
153 Milliseconds keepAliveInterval(0);
154
155 while (true) { // breaks once _shutdownSignaled is true
156
157 if (keepAliveInterval == Milliseconds(0)) {
158 keepAliveInterval = calculateKeepAliveInterval(replCoord->getConfig());
159 }
160
161 {
162 // Take SyncSourceFeedback lock before calling into ReplicationCoordinator
163 // to avoid deadlock because ReplicationCoordinator could conceivably calling back into
164 // this class.
165 stdx::unique_lock<stdx::mutex> lock(_mtx);
166 while (!_positionChanged && !_shutdownSignaled) {
167 {
168 MONGO_IDLE_THREAD_BLOCK;
169 if (_cond.wait_for(lock, keepAliveInterval.toSystemDuration()) !=
170 stdx::cv_status::timeout) {
171 continue;
172 }
173 }
174 MemberState state = replCoord->getMemberState();
175 if (!(state.primary() || state.startup())) {
176 break;
177 }
178 }
179
180 if (_shutdownSignaled) {
181 break;
182 }
183
184 _positionChanged = false;
185 }
186
187 {
188 stdx::lock_guard<stdx::mutex> lock(_mtx);
189 MemberState state = replCoord->getMemberState();
190 if (state.primary() || state.startup()) {
191 continue;
192 }
193 }
194
195 const HostAndPort target = bgsync->getSyncTarget();
196 // Log sync source changes.
197 if (target.empty()) {
198 if (syncTarget != target) {
199 syncTarget = target;
200 }
201 // Loop back around again; the keepalive functionality will cause us to retry
202 continue;
203 }
204
205 if (syncTarget != target) {
206 LOG(1) << "setting syncSourceFeedback to " << target;
207 syncTarget = target;
208
209 // Update keepalive value from config.
210 auto oldKeepAliveInterval = keepAliveInterval;
211 keepAliveInterval = calculateKeepAliveInterval(replCoord->getConfig());
212 if (oldKeepAliveInterval != keepAliveInterval) {
213 LOG(1) << "new syncSourceFeedback keep alive duration = " << keepAliveInterval
214 << " (previously " << oldKeepAliveInterval << ")";
215 }
216 }
217
218 Reporter reporter(executor,
219 makePrepareReplSetUpdatePositionCommandFn(replCoord, syncTarget, bgsync),
220 syncTarget,
221 keepAliveInterval,
222 syncSourceFeedbackNetworkTimeoutSecs);
223 {
224 stdx::lock_guard<stdx::mutex> lock(_mtx);
225 if (_shutdownSignaled) {
226 break;
227 }
228 _reporter = &reporter;
229 }
230 ON_BLOCK_EXIT([this]() {
231 stdx::lock_guard<stdx::mutex> lock(_mtx);
232 _reporter = nullptr;
233 });
234
235 auto status = _updateUpstream(&reporter);
236 if (!status.isOK()) {
237 LOG(1) << "The replication progress command (replSetUpdatePosition) failed and will be "
238 "retried: "
239 << status;
240 }
241 }
242 }
243
244 } // namespace repl
245 } // namespace mongo
246