1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/transport/transport_layer_asio.h"
36 
37 #include <set>
38 
39 #include "boost/algorithm/string.hpp"
40 
41 #include "asio.hpp"
42 
43 #include "mongo/config.h"
44 #ifdef MONGO_CONFIG_SSL
45 #include "asio/ssl.hpp"
46 #endif
47 
48 #include "mongo/base/checked_cast.h"
49 #include "mongo/base/system_error.h"
50 #include "mongo/db/server_options.h"
51 #include "mongo/db/service_context.h"
52 #include "mongo/transport/asio_utils.h"
53 #include "mongo/transport/service_entry_point.h"
54 #include "mongo/transport/ticket.h"
55 #include "mongo/transport/ticket_asio.h"
56 #include "mongo/util/log.h"
57 #include "mongo/util/net/hostandport.h"
58 #include "mongo/util/net/message.h"
59 #include "mongo/util/net/sock.h"
60 #include "mongo/util/net/sockaddr.h"
61 #include "mongo/util/net/ssl_manager.h"
62 #include "mongo/util/net/ssl_options.h"
63 
64 // session_asio.h has some header dependencies that require it to be the last header.
65 #include "mongo/transport/session_asio.h"
66 
67 namespace mongo {
68 namespace transport {
69 
Options(const ServerGlobalParams * params)70 TransportLayerASIO::Options::Options(const ServerGlobalParams* params)
71     : port(params->port),
72       ipList(params->bind_ips),
73 #ifndef _WIN32
74       useUnixSockets(!params->noUnixSocket),
75 #endif
76       enableIPv6(params->enableIPv6),
77       maxConns(params->maxConns) {
78 }
79 
TransportLayerASIO(const TransportLayerASIO::Options & opts,ServiceEntryPoint * sep)80 TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
81                                        ServiceEntryPoint* sep)
82     : _workerIOContext(std::make_shared<asio::io_context>()),
83       _acceptorIOContext(stdx::make_unique<asio::io_context>()),
84 #ifdef MONGO_CONFIG_SSL
85       _sslContext(nullptr),
86 #endif
87       _sep(sep),
88       _listenerOptions(opts) {
89 }
90 
91 TransportLayerASIO::~TransportLayerASIO() = default;
92 
sourceMessage(const SessionHandle & session,Message * message,Date_t expiration)93 Ticket TransportLayerASIO::sourceMessage(const SessionHandle& session,
94                                          Message* message,
95                                          Date_t expiration) {
96     auto asioSession = checked_pointer_cast<ASIOSession>(session);
97     auto ticket = stdx::make_unique<ASIOSourceTicket>(asioSession, expiration, message);
98     return {this, std::move(ticket)};
99 }
100 
sinkMessage(const SessionHandle & session,const Message & message,Date_t expiration)101 Ticket TransportLayerASIO::sinkMessage(const SessionHandle& session,
102                                        const Message& message,
103                                        Date_t expiration) {
104     auto asioSession = checked_pointer_cast<ASIOSession>(session);
105     auto ticket = stdx::make_unique<ASIOSinkTicket>(asioSession, expiration, message);
106     return {this, std::move(ticket)};
107 }
108 
wait(Ticket && ticket)109 Status TransportLayerASIO::wait(Ticket&& ticket) {
110     auto ownedASIOTicket = getOwnedTicketImpl(std::move(ticket));
111     auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());
112 
113     Status waitStatus = Status::OK();
114     asioTicket->fill(true, [&waitStatus](Status result) { waitStatus = result; });
115 
116     return waitStatus;
117 }
118 
asyncWait(Ticket && ticket,TicketCallback callback)119 void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) {
120     auto ownedASIOTicket = std::shared_ptr<TicketImpl>(getOwnedTicketImpl(std::move(ticket)));
121     auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());
122 
123     asioTicket->fill(
124         false,
125         [ callback = std::move(callback),
126           ownedASIOTicket = std::move(ownedASIOTicket) ](Status status) { callback(status); });
127 }
128 
129 // Must not be called while holding the TransportLayerASIO mutex.
end(const SessionHandle & session)130 void TransportLayerASIO::end(const SessionHandle& session) {
131     auto asioSession = checked_pointer_cast<ASIOSession>(session);
132     asioSession->shutdown();
133 }
134 
setup()135 Status TransportLayerASIO::setup() {
136     std::vector<std::string> listenAddrs;
137     if (_listenerOptions.ipList.empty()) {
138         listenAddrs = {"127.0.0.1"};
139         if (_listenerOptions.enableIPv6) {
140             listenAddrs.emplace_back("::1");
141         }
142     } else {
143         listenAddrs = _listenerOptions.ipList;
144     }
145 
146 #ifndef _WIN32
147     if (_listenerOptions.useUnixSockets) {
148         listenAddrs.emplace_back(makeUnixSockPath(_listenerOptions.port));
149     }
150 #endif
151 
152     std::set<SockAddr> boundAddrs;
153     for (auto& ip : listenAddrs) {
154         std::error_code ec;
155         if (ip.empty()) {
156             warning() << "Skipping empty bind address";
157             continue;
158         }
159 
160         const auto addrs = SockAddr::createAll(
161             ip, _listenerOptions.port, _listenerOptions.enableIPv6 ? AF_UNSPEC : AF_INET);
162         if (addrs.empty()) {
163             warning() << "Found no addresses for " << ip;
164             continue;
165         }
166 
167         for (const auto& addr : addrs) {
168             if (!boundAddrs.insert(addr).second) {
169                 // SockAddr::createAll() can return multiple addresses for a single endpoint. This
170                 // introduces the possibility that two different endpoints can return the same
171                 // address. Hence, we filter our addresses for uniqueness.
172                 continue;
173             }
174 
175             asio::generic::stream_protocol::endpoint endpoint(addr.raw(), addr.addressSize);
176 
177 #ifndef _WIN32
178             if (addr.getType() == AF_UNIX) {
179                 if (::unlink(ip.c_str()) == -1 && errno != ENOENT) {
180                     error() << "Failed to unlink socket file " << ip << " "
181                             << errnoWithDescription(errno);
182                     fassertFailedNoTrace(40486);
183                 }
184             }
185 #endif
186             if (addr.getType() == AF_INET6 && !_listenerOptions.enableIPv6) {
187                 error() << "Specified ipv6 bind address, but ipv6 is disabled";
188                 fassertFailedNoTrace(40488);
189             }
190 
191             GenericAcceptor acceptor(*_acceptorIOContext);
192             acceptor.open(endpoint.protocol());
193             acceptor.set_option(GenericAcceptor::reuse_address(true));
194             if (addr.getType() == AF_INET6) {
195                 acceptor.set_option(asio::ip::v6_only(true));
196             }
197 
198             acceptor.non_blocking(true, ec);
199             if (ec) {
200                 return errorCodeToStatus(ec);
201             }
202 
203             acceptor.bind(endpoint, ec);
204             if (ec) {
205                 return errorCodeToStatus(ec);
206             }
207 
208 #ifndef _WIN32
209             if (addr.getType() == AF_UNIX) {
210                 if (::chmod(ip.c_str(), serverGlobalParams.unixSocketPermissions) == -1) {
211                     error() << "Failed to chmod socket file " << ip << " "
212                             << errnoWithDescription(errno);
213                     fassertFailedNoTrace(40487);
214                 }
215             }
216 #endif
217             _acceptors.emplace_back(std::make_pair(std::move(addr), std::move(acceptor)));
218         }
219     }
220 
221     if (_acceptors.empty()) {
222         return Status(ErrorCodes::SocketException, "No available addresses/ports to bind to");
223     }
224 
225 #ifdef MONGO_CONFIG_SSL
226     const auto& sslParams = getSSLGlobalParams();
227 
228     if (_sslMode() != SSLParams::SSLMode_disabled) {
229         _sslContext = stdx::make_unique<asio::ssl::context>(asio::ssl::context::sslv23);
230 
231         Status status =
232             getSSLManager()->initSSLContext(_sslContext->native_handle(),
233                                             sslParams,
234                                             SSLManagerInterface::ConnectionDirection::kIncoming);
235         if (!status.isOK()) {
236             return status;
237         }
238     }
239 #endif
240 
241     return Status::OK();
242 }
243 
start()244 Status TransportLayerASIO::start() {
245     stdx::lock_guard<stdx::mutex> lk(_mutex);
246     _running.store(true);
247 
248     _listenerThread = stdx::thread([this] {
249         setThreadName("listener");
250         while (_running.load()) {
251             asio::io_context::work work(*_acceptorIOContext);
252             try {
253                 _acceptorIOContext->run();
254             } catch (...) {
255                 severe() << "Uncaught exception in the listener: " << exceptionToStatus();
256                 fassertFailed(40491);
257             }
258         }
259     });
260 
261     for (auto& acceptor : _acceptors) {
262         asio::error_code ec;
263         acceptor.second.listen(serverGlobalParams.listenBacklog, ec);
264         if (ec) {
265             severe() << "Error listening for new connections on " << acceptor.first << ": "
266                      << ec.message();
267             fassertFailed(31339);
268         }
269         log() << "listening via socket bound to " << acceptor.first.getAddr();
270 
271         _acceptConnection(acceptor.second);
272     }
273 
274     const char* ssl = "";
275 #ifdef MONGO_CONFIG_SSL
276     if (_sslMode() != SSLParams::SSLMode_disabled) {
277         ssl = " ssl";
278     }
279 #endif
280     log() << "waiting for connections on port " << _listenerOptions.port << ssl;
281 
282     return Status::OK();
283 }
284 
shutdown()285 void TransportLayerASIO::shutdown() {
286     stdx::lock_guard<stdx::mutex> lk(_mutex);
287     _running.store(false);
288 
289     // Loop through the acceptors and cancel their calls to async_accept. This will prevent new
290     // connections from being opened.
291     for (auto& acceptor : _acceptors) {
292         acceptor.second.cancel();
293         auto& addr = acceptor.first;
294         if (addr.getType() == AF_UNIX && !addr.isAnonymousUNIXSocket()) {
295             auto path = addr.getAddr();
296             log() << "removing socket file: " << path;
297             if (::unlink(path.c_str()) != 0) {
298                 const auto ewd = errnoWithDescription();
299                 warning() << "Unable to remove UNIX socket " << path << ": " << ewd;
300             }
301         }
302     }
303 
304     // If the listener thread is joinable (that is, we created/started a listener thread), then
305     // the io_context is owned exclusively by the TransportLayer and we should stop it and join
306     // the listener thread.
307     //
308     // Otherwise the ServiceExecutor may need to continue running the io_context to drain running
309     // connections, so we just cancel the acceptors and return.
310     if (_listenerThread.joinable()) {
311         _acceptorIOContext->stop();
312         _listenerThread.join();
313     }
314 }
315 
getIOContext()316 const std::shared_ptr<asio::io_context>& TransportLayerASIO::getIOContext() {
317     return _workerIOContext;
318 }
319 
_acceptConnection(GenericAcceptor & acceptor)320 void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {
321     auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable {
322         if (!_running.load())
323             return;
324 
325         if (ec) {
326             log() << "Error accepting new connection on "
327                   << endpointToHostAndPort(acceptor.local_endpoint()) << ": " << ec.message();
328             _acceptConnection(acceptor);
329             return;
330         }
331 
332         try {
333             std::shared_ptr<ASIOSession> session(new ASIOSession(this, std::move(peerSocket)));
334             _sep->startSession(std::move(session));
335         } catch (const DBException& e) {
336             warning() << "Error accepting new connection " << e;
337         }
338 
339         _acceptConnection(acceptor);
340     };
341 
342     acceptor.async_accept(*_workerIOContext, std::move(acceptCb));
343 }
344 
345 #ifdef MONGO_CONFIG_SSL
_sslMode() const346 SSLParams::SSLModes TransportLayerASIO::_sslMode() const {
347     return static_cast<SSLParams::SSLModes>(getSSLGlobalParams().sslMode.load());
348 }
349 #endif
350 
351 }  // namespace transport
352 }  // namespace mongo
353