1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/executor/connection_pool_asio.h"
36
37 #include <asio.hpp>
38
39 #include "mongo/executor/async_stream_factory_interface.h"
40 #include "mongo/executor/network_interface_asio.h"
41 #include "mongo/rpc/factory.h"
42 #include "mongo/rpc/legacy_request_builder.h"
43 #include "mongo/rpc/reply_interface.h"
44 #include "mongo/stdx/memory.h"
45 #include "mongo/util/log.h"
46
47 namespace mongo {
48 namespace executor {
49 namespace connection_pool_asio {
50
ASIOTimer(asio::io_service::strand * strand)51 ASIOTimer::ASIOTimer(asio::io_service::strand* strand)
52 : _strand(strand),
53 _impl(strand->get_io_service()),
54 _callbackSharedState(std::make_shared<CallbackSharedState>()) {}
55
~ASIOTimer()56 ASIOTimer::~ASIOTimer() {
57 stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
58 ++_callbackSharedState->id;
59 }
60
61 const auto kMaxTimerDuration = duration_cast<Milliseconds>(ASIOTimer::clock_type::duration::max());
62
setTimeout(Milliseconds timeout,TimeoutCallback cb)63 void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
64 _strand->dispatch([this, timeout, cb] {
65 _cb = std::move(cb);
66
67 cancelTimeout();
68
69 try {
70 _impl.expires_after(std::min(kMaxTimerDuration, timeout).toSystemDuration());
71 } catch (const asio::system_error& ec) {
72 severe() << "Failed to set connection pool timer: " << ec.what();
73 fassertFailed(40333);
74 }
75
76 decltype(_callbackSharedState->id) id;
77 decltype(_callbackSharedState) sharedState;
78
79 {
80 stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
81 id = ++_callbackSharedState->id;
82 sharedState = _callbackSharedState;
83 }
84
85 _impl.async_wait(_strand->wrap([this, id, sharedState](const asio::error_code& error) {
86 if (error == asio::error::operation_aborted) {
87 return;
88 }
89
90 stdx::unique_lock<stdx::mutex> lk(sharedState->mutex);
91
92 // If the id in shared state doesn't match the id in our callback, it
93 // means we were cancelled, but still executed. This can occur if we
94 // were cancelled just before our timeout. We need a generation, rather
95 // than just a bool here, because we could have been cancelled and
96 // another callback set, in which case we shouldn't run and the we
97 // should let the other callback execute instead.
98 if (sharedState->id == id) {
99 auto cb = std::move(_cb);
100 lk.unlock();
101 cb();
102 }
103 }));
104 });
105 }
106
cancelTimeout()107 void ASIOTimer::cancelTimeout() {
108 decltype(_callbackSharedState) sharedState;
109 decltype(_callbackSharedState->id) id;
110 {
111 stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
112 id = ++_callbackSharedState->id;
113 sharedState = _callbackSharedState;
114 }
115 _strand->dispatch([this, id, sharedState] {
116 stdx::lock_guard<stdx::mutex> lk(sharedState->mutex);
117 if (sharedState->id != id)
118 return;
119
120 std::error_code ec;
121 _impl.cancel(ec);
122 if (ec) {
123 log() << "Failed to cancel connection pool timer: " << ec.message();
124 }
125 });
126 }
127
ASIOConnection(const HostAndPort & hostAndPort,size_t generation,ASIOImpl * global)128 ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global)
129 : _global(global),
130 _hostAndPort(hostAndPort),
131 _generation(generation),
132 _impl(makeAsyncOp(this)),
133 _timer(&_impl->strand()) {}
134
~ASIOConnection()135 ASIOConnection::~ASIOConnection() {
136 if (_impl) {
137 stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex);
138 _impl->_access->id++;
139 }
140 }
141
indicateSuccess()142 void ASIOConnection::indicateSuccess() {
143 _status = Status::OK();
144 }
145
indicateFailure(Status status)146 void ASIOConnection::indicateFailure(Status status) {
147 invariant(!status.isOK());
148 _status = std::move(status);
149 }
150
getHostAndPort() const151 const HostAndPort& ASIOConnection::getHostAndPort() const {
152 return _hostAndPort;
153 }
154
isHealthy()155 bool ASIOConnection::isHealthy() {
156 // Check if the remote host has closed the connection.
157 return _impl->connection().stream().isOpen();
158 }
159
indicateUsed()160 void ASIOConnection::indicateUsed() {
161 // It is illegal to attempt to use a connection after calling indicateFailure().
162 invariant(_status.isOK() || _status == ConnectionPool::kConnectionStateUnknown);
163 _lastUsed = _global->now();
164 }
165
getLastUsed() const166 Date_t ASIOConnection::getLastUsed() const {
167 return _lastUsed;
168 }
169
getStatus() const170 const Status& ASIOConnection::getStatus() const {
171 return _status;
172 }
173
getGeneration() const174 size_t ASIOConnection::getGeneration() const {
175 return _generation;
176 }
177
makeAsyncOp(ASIOConnection * conn)178 std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOConnection* conn) {
179 return stdx::make_unique<NetworkInterfaceASIO::AsyncOp>(
180 conn->_global->_impl,
181 TaskExecutor::CallbackHandle(),
182 RemoteCommandRequest{conn->getHostAndPort(),
183 std::string("admin"),
184 BSON("isMaster" << 1),
185 BSONObj(),
186 nullptr},
187 [conn](const TaskExecutor::ResponseStatus& rs) {
188 auto cb = std::move(conn->_setupCallback);
189 cb(conn, rs.status);
190 },
191 conn->_global->now());
192 }
193
makeIsMasterRequest(ASIOConnection * conn)194 Message ASIOConnection::makeIsMasterRequest(ASIOConnection* conn) {
195 return rpc::legacyRequestFromOpMsgRequest(
196 OpMsgRequest::fromDBAndBody("admin", BSON("isMaster" << 1)));
197 }
198
setTimeout(Milliseconds timeout,TimeoutCallback cb)199 void ASIOConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
200 _timer.setTimeout(timeout, std::move(cb));
201 }
202
cancelTimeout()203 void ASIOConnection::cancelTimeout() {
204 _timer.cancelTimeout();
205 }
206
setup(Milliseconds timeout,SetupCallback cb)207 void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) {
208 _impl->strand().dispatch([this, timeout, cb] {
209 _setupCallback = [this, cb](ConnectionInterface* ptr, Status status) {
210 {
211 stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex);
212 _impl->_access->id++;
213
214 // If our connection timeout callback ran but wasn't the reason we exited
215 // the state machine, clear any TIMED_OUT state.
216 if (status.isOK()) {
217 _impl->_transitionToState_inlock(
218 NetworkInterfaceASIO::AsyncOp::State::kUninitialized);
219 _impl->_transitionToState_inlock(
220 NetworkInterfaceASIO::AsyncOp::State::kInProgress);
221 _impl->_transitionToState_inlock(
222 NetworkInterfaceASIO::AsyncOp::State::kFinished);
223 }
224 }
225
226 cancelTimeout();
227 cb(ptr, status);
228 };
229
230 // Capturing the shared access pad and generation before calling setTimeout gives us enough
231 // information to avoid calling the timer if we shouldn't without needing any other
232 // resources that might have been cleaned up.
233 decltype(_impl->_access) access;
234 std::size_t generation;
235 {
236 stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex);
237 access = _impl->_access;
238 generation = access->id;
239 }
240
241 // Actually timeout setup
242 setTimeout(timeout, [this, access, generation] {
243 stdx::lock_guard<stdx::mutex> lk(access->mutex);
244 if (generation != access->id) {
245 // The operation has been cleaned up, do not access.
246 return;
247 }
248 _impl->timeOut_inlock();
249 });
250
251 _global->_impl->_connect(_impl.get());
252 });
253 }
254
resetToUnknown()255 void ASIOConnection::resetToUnknown() {
256 _status = ConnectionPool::kConnectionStateUnknown;
257 }
258
refresh(Milliseconds timeout,RefreshCallback cb)259 void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
260 _impl->strand().dispatch([this, timeout, cb] {
261 auto op = _impl.get();
262
263 // We clear state transitions because we're re-running a portion of the asio state machine
264 // without entering in startCommand (which would call this for us).
265 op->clearStateTransitions();
266
267 _refreshCallback = std::move(cb);
268
269 // Actually timeout refreshes
270 setTimeout(timeout, [this] { _impl->connection().stream().cancel(); });
271
272 // Our pings are isMaster's
273 auto beginStatus = op->beginCommand(makeIsMasterRequest(this), _hostAndPort);
274 if (!beginStatus.isOK()) {
275 auto cb = std::move(_refreshCallback);
276 cb(this, beginStatus);
277 return;
278 }
279
280 // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such
281 // we
282 // need to intercept those calls so we can capture them. This will get cleared out when we
283 // fill
284 // in the real onFinish in startCommand.
285 op->setOnFinish([this](RemoteCommandResponse failedResponse) {
286 invariant(!failedResponse.isOK());
287 auto cb = std::move(_refreshCallback);
288 cb(this, failedResponse.status);
289 });
290
291 op->_inRefresh = true;
292
293 _global->_impl->_asyncRunCommand(op, [this, op](std::error_code ec, size_t bytes) {
294 cancelTimeout();
295
296 auto cb = std::move(_refreshCallback);
297
298 if (ec)
299 return cb(this, Status(ErrorCodes::HostUnreachable, ec.message()));
300
301 op->_inRefresh = false;
302 cb(this, Status::OK());
303 });
304 });
305 }
306
releaseAsyncOp()307 std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::releaseAsyncOp() {
308 {
309 stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex);
310 _impl->_access->id++;
311 }
312 return std::move(_impl);
313 }
314
bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op)315 void ASIOConnection::bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op) {
316 _impl = std::move(op);
317 }
318
ASIOImpl(NetworkInterfaceASIO * impl)319 ASIOImpl::ASIOImpl(NetworkInterfaceASIO* impl) : _impl(impl) {}
320
now()321 Date_t ASIOImpl::now() {
322 return _impl->now();
323 }
324
makeTimer()325 std::unique_ptr<ConnectionPool::TimerInterface> ASIOImpl::makeTimer() {
326 return stdx::make_unique<ASIOTimer>(&_impl->_strand);
327 }
328
makeConnection(const HostAndPort & hostAndPort,size_t generation)329 std::unique_ptr<ConnectionPool::ConnectionInterface> ASIOImpl::makeConnection(
330 const HostAndPort& hostAndPort, size_t generation) {
331 return stdx::make_unique<ASIOConnection>(hostAndPort, generation, this);
332 }
333
334 } // namespace connection_pool_asio
335 } // namespace executor
336 } // namespace mongo
337