1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/repl/sync_source_feedback.h"
36 
37 #include "mongo/db/client.h"
38 #include "mongo/db/repl/bgsync.h"
39 #include "mongo/db/repl/repl_set_config.h"
40 #include "mongo/db/repl/replication_coordinator.h"
41 #include "mongo/db/repl/reporter.h"
42 #include "mongo/executor/task_executor.h"
43 #include "mongo/util/concurrency/idle_thread_block.h"
44 #include "mongo/util/log.h"
45 #include "mongo/util/net/hostandport.h"
46 #include "mongo/util/scopeguard.h"
47 #include "mongo/util/time_support.h"
48 
49 namespace mongo {
50 namespace repl {
51 
52 namespace {
53 
54 
55 const Milliseconds maximumKeepAliveIntervalMS(30 * 1000);
56 
57 // The network timeout used for replSetUpdatePosition requests made to a node's sync source.
58 const Seconds syncSourceFeedbackNetworkTimeoutSecs(30);
59 
60 /**
61  * Calculates the keep alive interval based on the given ReplSetConfig.
62  */
calculateKeepAliveInterval(const ReplSetConfig & rsConfig)63 Milliseconds calculateKeepAliveInterval(const ReplSetConfig& rsConfig) {
64     return std::min((rsConfig.getElectionTimeoutPeriod() / 2), maximumKeepAliveIntervalMS);
65 }
66 
67 /**
68  * Returns function to prepare update command
69  */
makePrepareReplSetUpdatePositionCommandFn(ReplicationCoordinator * replCoord,const HostAndPort & syncTarget,BackgroundSync * bgsync)70 Reporter::PrepareReplSetUpdatePositionCommandFn makePrepareReplSetUpdatePositionCommandFn(
71     ReplicationCoordinator* replCoord, const HostAndPort& syncTarget, BackgroundSync* bgsync) {
72     return [syncTarget, replCoord, bgsync]() -> StatusWith<BSONObj> {
73         auto currentSyncTarget = bgsync->getSyncTarget();
74         if (currentSyncTarget != syncTarget) {
75             if (currentSyncTarget.empty()) {
76                 // Sync source was cleared.
77                 return Status(ErrorCodes::InvalidSyncSource,
78                               str::stream() << "Sync source was cleared. Was " << syncTarget);
79 
80             } else {
81                 // Sync source changed.
82                 return Status(ErrorCodes::InvalidSyncSource,
83                               str::stream() << "Sync source changed from " << syncTarget << " to "
84                                             << currentSyncTarget);
85             }
86         }
87 
88         if (replCoord->getMemberState().primary()) {
89             // Primary has no one to send updates to.
90             return Status(ErrorCodes::InvalidSyncSource,
91                           "Currently primary - no one to send updates to");
92         }
93 
94         return replCoord->prepareReplSetUpdatePositionCommand();
95     };
96 }
97 
98 }  // namespace
99 
forwardSlaveProgress()100 void SyncSourceFeedback::forwardSlaveProgress() {
101     {
102         stdx::unique_lock<stdx::mutex> lock(_mtx);
103         _positionChanged = true;
104         _cond.notify_all();
105         if (_reporter) {
106             auto triggerStatus = _reporter->trigger();
107             if (!triggerStatus.isOK()) {
108                 warning() << "unable to forward slave progress to " << _reporter->getTarget()
109                           << ": " << triggerStatus;
110             }
111         }
112     }
113 }
114 
_updateUpstream(Reporter * reporter)115 Status SyncSourceFeedback::_updateUpstream(Reporter* reporter) {
116     auto syncTarget = reporter->getTarget();
117 
118     auto triggerStatus = reporter->trigger();
119     if (!triggerStatus.isOK()) {
120         warning() << "unable to schedule reporter to update replication progress on " << syncTarget
121                   << ": " << triggerStatus;
122         return triggerStatus;
123     }
124 
125     auto status = reporter->join();
126 
127     if (!status.isOK()) {
128         log() << "SyncSourceFeedback error sending update to " << syncTarget << ": " << status;
129     }
130 
131     // Sync source blacklisting will be done in BackgroundSync and SyncSourceResolver.
132 
133     return status;
134 }
135 
shutdown()136 void SyncSourceFeedback::shutdown() {
137     stdx::unique_lock<stdx::mutex> lock(_mtx);
138     if (_reporter) {
139         _reporter->shutdown();
140     }
141     _shutdownSignaled = true;
142     _cond.notify_all();
143 }
144 
run(executor::TaskExecutor * executor,BackgroundSync * bgsync,ReplicationCoordinator * replCoord)145 void SyncSourceFeedback::run(executor::TaskExecutor* executor,
146                              BackgroundSync* bgsync,
147                              ReplicationCoordinator* replCoord) {
148     Client::initThread("SyncSourceFeedback");
149 
150     HostAndPort syncTarget;
151 
152     // keepAliveInterval indicates how frequently to forward progress in the absence of updates.
153     Milliseconds keepAliveInterval(0);
154 
155     while (true) {  // breaks once _shutdownSignaled is true
156 
157         if (keepAliveInterval == Milliseconds(0)) {
158             keepAliveInterval = calculateKeepAliveInterval(replCoord->getConfig());
159         }
160 
161         {
162             // Take SyncSourceFeedback lock before calling into ReplicationCoordinator
163             // to avoid deadlock because ReplicationCoordinator could conceivably calling back into
164             // this class.
165             stdx::unique_lock<stdx::mutex> lock(_mtx);
166             while (!_positionChanged && !_shutdownSignaled) {
167                 {
168                     MONGO_IDLE_THREAD_BLOCK;
169                     if (_cond.wait_for(lock, keepAliveInterval.toSystemDuration()) !=
170                         stdx::cv_status::timeout) {
171                         continue;
172                     }
173                 }
174                 MemberState state = replCoord->getMemberState();
175                 if (!(state.primary() || state.startup())) {
176                     break;
177                 }
178             }
179 
180             if (_shutdownSignaled) {
181                 break;
182             }
183 
184             _positionChanged = false;
185         }
186 
187         {
188             stdx::lock_guard<stdx::mutex> lock(_mtx);
189             MemberState state = replCoord->getMemberState();
190             if (state.primary() || state.startup()) {
191                 continue;
192             }
193         }
194 
195         const HostAndPort target = bgsync->getSyncTarget();
196         // Log sync source changes.
197         if (target.empty()) {
198             if (syncTarget != target) {
199                 syncTarget = target;
200             }
201             // Loop back around again; the keepalive functionality will cause us to retry
202             continue;
203         }
204 
205         if (syncTarget != target) {
206             LOG(1) << "setting syncSourceFeedback to " << target;
207             syncTarget = target;
208 
209             // Update keepalive value from config.
210             auto oldKeepAliveInterval = keepAliveInterval;
211             keepAliveInterval = calculateKeepAliveInterval(replCoord->getConfig());
212             if (oldKeepAliveInterval != keepAliveInterval) {
213                 LOG(1) << "new syncSourceFeedback keep alive duration = " << keepAliveInterval
214                        << " (previously " << oldKeepAliveInterval << ")";
215             }
216         }
217 
218         Reporter reporter(executor,
219                           makePrepareReplSetUpdatePositionCommandFn(replCoord, syncTarget, bgsync),
220                           syncTarget,
221                           keepAliveInterval,
222                           syncSourceFeedbackNetworkTimeoutSecs);
223         {
224             stdx::lock_guard<stdx::mutex> lock(_mtx);
225             if (_shutdownSignaled) {
226                 break;
227             }
228             _reporter = &reporter;
229         }
230         ON_BLOCK_EXIT([this]() {
231             stdx::lock_guard<stdx::mutex> lock(_mtx);
232             _reporter = nullptr;
233         });
234 
235         auto status = _updateUpstream(&reporter);
236         if (!status.isOK()) {
237             LOG(1) << "The replication progress command (replSetUpdatePosition) failed and will be "
238                       "retried: "
239                    << status;
240         }
241     }
242 }
243 
244 }  // namespace repl
245 }  // namespace mongo
246