1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/executor/connection_pool_asio.h"
36 
37 #include <asio.hpp>
38 
39 #include "mongo/executor/async_stream_factory_interface.h"
40 #include "mongo/executor/network_interface_asio.h"
41 #include "mongo/rpc/factory.h"
42 #include "mongo/rpc/legacy_request_builder.h"
43 #include "mongo/rpc/reply_interface.h"
44 #include "mongo/stdx/memory.h"
45 #include "mongo/util/log.h"
46 
47 namespace mongo {
48 namespace executor {
49 namespace connection_pool_asio {
50 
ASIOTimer(asio::io_service::strand * strand)51 ASIOTimer::ASIOTimer(asio::io_service::strand* strand)
52     : _strand(strand),
53       _impl(strand->get_io_service()),
54       _callbackSharedState(std::make_shared<CallbackSharedState>()) {}
55 
~ASIOTimer()56 ASIOTimer::~ASIOTimer() {
57     stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
58     ++_callbackSharedState->id;
59 }
60 
61 const auto kMaxTimerDuration = duration_cast<Milliseconds>(ASIOTimer::clock_type::duration::max());
62 
setTimeout(Milliseconds timeout,TimeoutCallback cb)63 void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
64     _strand->dispatch([this, timeout, cb] {
65         _cb = std::move(cb);
66 
67         cancelTimeout();
68 
69         try {
70             _impl.expires_after(std::min(kMaxTimerDuration, timeout).toSystemDuration());
71         } catch (const asio::system_error& ec) {
72             severe() << "Failed to set connection pool timer: " << ec.what();
73             fassertFailed(40333);
74         }
75 
76         decltype(_callbackSharedState->id) id;
77         decltype(_callbackSharedState) sharedState;
78 
79         {
80             stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
81             id = ++_callbackSharedState->id;
82             sharedState = _callbackSharedState;
83         }
84 
85         _impl.async_wait(_strand->wrap([this, id, sharedState](const asio::error_code& error) {
86             if (error == asio::error::operation_aborted) {
87                 return;
88             }
89 
90             stdx::unique_lock<stdx::mutex> lk(sharedState->mutex);
91 
92             // If the id in shared state doesn't match the id in our callback, it
93             // means we were cancelled, but still executed. This can occur if we
94             // were cancelled just before our timeout. We need a generation, rather
95             // than just a bool here, because we could have been cancelled and
96             // another callback set, in which case we shouldn't run and the we
97             // should let the other callback execute instead.
98             if (sharedState->id == id) {
99                 auto cb = std::move(_cb);
100                 lk.unlock();
101                 cb();
102             }
103         }));
104     });
105 }
106 
cancelTimeout()107 void ASIOTimer::cancelTimeout() {
108     decltype(_callbackSharedState) sharedState;
109     decltype(_callbackSharedState->id) id;
110     {
111         stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
112         id = ++_callbackSharedState->id;
113         sharedState = _callbackSharedState;
114     }
115     _strand->dispatch([this, id, sharedState] {
116         stdx::lock_guard<stdx::mutex> lk(sharedState->mutex);
117         if (sharedState->id != id)
118             return;
119 
120         std::error_code ec;
121         _impl.cancel(ec);
122         if (ec) {
123             log() << "Failed to cancel connection pool timer: " << ec.message();
124         }
125     });
126 }
127 
ASIOConnection(const HostAndPort & hostAndPort,size_t generation,ASIOImpl * global)128 ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global)
129     : _global(global),
130       _hostAndPort(hostAndPort),
131       _generation(generation),
132       _impl(makeAsyncOp(this)),
133       _timer(&_impl->strand()) {}
134 
~ASIOConnection()135 ASIOConnection::~ASIOConnection() {
136     if (_impl) {
137         stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex);
138         _impl->_access->id++;
139     }
140 }
141 
indicateSuccess()142 void ASIOConnection::indicateSuccess() {
143     _status = Status::OK();
144 }
145 
indicateFailure(Status status)146 void ASIOConnection::indicateFailure(Status status) {
147     invariant(!status.isOK());
148     _status = std::move(status);
149 }
150 
getHostAndPort() const151 const HostAndPort& ASIOConnection::getHostAndPort() const {
152     return _hostAndPort;
153 }
154 
isHealthy()155 bool ASIOConnection::isHealthy() {
156     // Check if the remote host has closed the connection.
157     return _impl->connection().stream().isOpen();
158 }
159 
indicateUsed()160 void ASIOConnection::indicateUsed() {
161     // It is illegal to attempt to use a connection after calling indicateFailure().
162     invariant(_status.isOK() || _status == ConnectionPool::kConnectionStateUnknown);
163     _lastUsed = _global->now();
164 }
165 
getLastUsed() const166 Date_t ASIOConnection::getLastUsed() const {
167     return _lastUsed;
168 }
169 
getStatus() const170 const Status& ASIOConnection::getStatus() const {
171     return _status;
172 }
173 
getGeneration() const174 size_t ASIOConnection::getGeneration() const {
175     return _generation;
176 }
177 
makeAsyncOp(ASIOConnection * conn)178 std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOConnection* conn) {
179     return stdx::make_unique<NetworkInterfaceASIO::AsyncOp>(
180         conn->_global->_impl,
181         TaskExecutor::CallbackHandle(),
182         RemoteCommandRequest{conn->getHostAndPort(),
183                              std::string("admin"),
184                              BSON("isMaster" << 1),
185                              BSONObj(),
186                              nullptr},
187         [conn](const TaskExecutor::ResponseStatus& rs) {
188             auto cb = std::move(conn->_setupCallback);
189             cb(conn, rs.status);
190         },
191         conn->_global->now());
192 }
193 
makeIsMasterRequest(ASIOConnection * conn)194 Message ASIOConnection::makeIsMasterRequest(ASIOConnection* conn) {
195     return rpc::legacyRequestFromOpMsgRequest(
196         OpMsgRequest::fromDBAndBody("admin", BSON("isMaster" << 1)));
197 }
198 
setTimeout(Milliseconds timeout,TimeoutCallback cb)199 void ASIOConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
200     _timer.setTimeout(timeout, std::move(cb));
201 }
202 
cancelTimeout()203 void ASIOConnection::cancelTimeout() {
204     _timer.cancelTimeout();
205 }
206 
setup(Milliseconds timeout,SetupCallback cb)207 void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) {
208     _impl->strand().dispatch([this, timeout, cb] {
209         _setupCallback = [this, cb](ConnectionInterface* ptr, Status status) {
210             {
211                 stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex);
212                 _impl->_access->id++;
213 
214                 // If our connection timeout callback ran but wasn't the reason we exited
215                 // the state machine, clear any TIMED_OUT state.
216                 if (status.isOK()) {
217                     _impl->_transitionToState_inlock(
218                         NetworkInterfaceASIO::AsyncOp::State::kUninitialized);
219                     _impl->_transitionToState_inlock(
220                         NetworkInterfaceASIO::AsyncOp::State::kInProgress);
221                     _impl->_transitionToState_inlock(
222                         NetworkInterfaceASIO::AsyncOp::State::kFinished);
223                 }
224             }
225 
226             cancelTimeout();
227             cb(ptr, status);
228         };
229 
230         // Capturing the shared access pad and generation before calling setTimeout gives us enough
231         // information to avoid calling the timer if we shouldn't without needing any other
232         // resources that might have been cleaned up.
233         decltype(_impl->_access) access;
234         std::size_t generation;
235         {
236             stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex);
237             access = _impl->_access;
238             generation = access->id;
239         }
240 
241         // Actually timeout setup
242         setTimeout(timeout, [this, access, generation] {
243             stdx::lock_guard<stdx::mutex> lk(access->mutex);
244             if (generation != access->id) {
245                 // The operation has been cleaned up, do not access.
246                 return;
247             }
248             _impl->timeOut_inlock();
249         });
250 
251         _global->_impl->_connect(_impl.get());
252     });
253 }
254 
resetToUnknown()255 void ASIOConnection::resetToUnknown() {
256     _status = ConnectionPool::kConnectionStateUnknown;
257 }
258 
refresh(Milliseconds timeout,RefreshCallback cb)259 void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
260     _impl->strand().dispatch([this, timeout, cb] {
261         auto op = _impl.get();
262 
263         // We clear state transitions because we're re-running a portion of the asio state machine
264         // without entering in startCommand (which would call this for us).
265         op->clearStateTransitions();
266 
267         _refreshCallback = std::move(cb);
268 
269         // Actually timeout refreshes
270         setTimeout(timeout, [this] { _impl->connection().stream().cancel(); });
271 
272         // Our pings are isMaster's
273         auto beginStatus = op->beginCommand(makeIsMasterRequest(this), _hostAndPort);
274         if (!beginStatus.isOK()) {
275             auto cb = std::move(_refreshCallback);
276             cb(this, beginStatus);
277             return;
278         }
279 
280         // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such
281         // we
282         // need to intercept those calls so we can capture them. This will get cleared out when we
283         // fill
284         // in the real onFinish in startCommand.
285         op->setOnFinish([this](RemoteCommandResponse failedResponse) {
286             invariant(!failedResponse.isOK());
287             auto cb = std::move(_refreshCallback);
288             cb(this, failedResponse.status);
289         });
290 
291         op->_inRefresh = true;
292 
293         _global->_impl->_asyncRunCommand(op, [this, op](std::error_code ec, size_t bytes) {
294             cancelTimeout();
295 
296             auto cb = std::move(_refreshCallback);
297 
298             if (ec)
299                 return cb(this, Status(ErrorCodes::HostUnreachable, ec.message()));
300 
301             op->_inRefresh = false;
302             cb(this, Status::OK());
303         });
304     });
305 }
306 
releaseAsyncOp()307 std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::releaseAsyncOp() {
308     {
309         stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex);
310         _impl->_access->id++;
311     }
312     return std::move(_impl);
313 }
314 
bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op)315 void ASIOConnection::bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op) {
316     _impl = std::move(op);
317 }
318 
ASIOImpl(NetworkInterfaceASIO * impl)319 ASIOImpl::ASIOImpl(NetworkInterfaceASIO* impl) : _impl(impl) {}
320 
now()321 Date_t ASIOImpl::now() {
322     return _impl->now();
323 }
324 
makeTimer()325 std::unique_ptr<ConnectionPool::TimerInterface> ASIOImpl::makeTimer() {
326     return stdx::make_unique<ASIOTimer>(&_impl->_strand);
327 }
328 
makeConnection(const HostAndPort & hostAndPort,size_t generation)329 std::unique_ptr<ConnectionPool::ConnectionInterface> ASIOImpl::makeConnection(
330     const HostAndPort& hostAndPort, size_t generation) {
331     return stdx::make_unique<ASIOConnection>(hostAndPort, generation, this);
332 }
333 
334 }  // namespace connection_pool_asio
335 }  // namespace executor
336 }  // namespace mongo
337