1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/transport/transport_layer_asio.h"
36
37 #include <set>
38
39 #include "boost/algorithm/string.hpp"
40
41 #include "asio.hpp"
42
43 #include "mongo/config.h"
44 #ifdef MONGO_CONFIG_SSL
45 #include "asio/ssl.hpp"
46 #endif
47
48 #include "mongo/base/checked_cast.h"
49 #include "mongo/base/system_error.h"
50 #include "mongo/db/server_options.h"
51 #include "mongo/db/service_context.h"
52 #include "mongo/transport/asio_utils.h"
53 #include "mongo/transport/service_entry_point.h"
54 #include "mongo/transport/ticket.h"
55 #include "mongo/transport/ticket_asio.h"
56 #include "mongo/util/log.h"
57 #include "mongo/util/net/hostandport.h"
58 #include "mongo/util/net/message.h"
59 #include "mongo/util/net/sock.h"
60 #include "mongo/util/net/sockaddr.h"
61 #include "mongo/util/net/ssl_manager.h"
62 #include "mongo/util/net/ssl_options.h"
63
64 // session_asio.h has some header dependencies that require it to be the last header.
65 #include "mongo/transport/session_asio.h"
66
67 namespace mongo {
68 namespace transport {
69
Options(const ServerGlobalParams * params)70 TransportLayerASIO::Options::Options(const ServerGlobalParams* params)
71 : port(params->port),
72 ipList(params->bind_ips),
73 #ifndef _WIN32
74 useUnixSockets(!params->noUnixSocket),
75 #endif
76 enableIPv6(params->enableIPv6),
77 maxConns(params->maxConns) {
78 }
79
TransportLayerASIO(const TransportLayerASIO::Options & opts,ServiceEntryPoint * sep)80 TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
81 ServiceEntryPoint* sep)
82 : _workerIOContext(std::make_shared<asio::io_context>()),
83 _acceptorIOContext(stdx::make_unique<asio::io_context>()),
84 #ifdef MONGO_CONFIG_SSL
85 _sslContext(nullptr),
86 #endif
87 _sep(sep),
88 _listenerOptions(opts) {
89 }
90
91 TransportLayerASIO::~TransportLayerASIO() = default;
92
sourceMessage(const SessionHandle & session,Message * message,Date_t expiration)93 Ticket TransportLayerASIO::sourceMessage(const SessionHandle& session,
94 Message* message,
95 Date_t expiration) {
96 auto asioSession = checked_pointer_cast<ASIOSession>(session);
97 auto ticket = stdx::make_unique<ASIOSourceTicket>(asioSession, expiration, message);
98 return {this, std::move(ticket)};
99 }
100
sinkMessage(const SessionHandle & session,const Message & message,Date_t expiration)101 Ticket TransportLayerASIO::sinkMessage(const SessionHandle& session,
102 const Message& message,
103 Date_t expiration) {
104 auto asioSession = checked_pointer_cast<ASIOSession>(session);
105 auto ticket = stdx::make_unique<ASIOSinkTicket>(asioSession, expiration, message);
106 return {this, std::move(ticket)};
107 }
108
wait(Ticket && ticket)109 Status TransportLayerASIO::wait(Ticket&& ticket) {
110 auto ownedASIOTicket = getOwnedTicketImpl(std::move(ticket));
111 auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());
112
113 Status waitStatus = Status::OK();
114 asioTicket->fill(true, [&waitStatus](Status result) { waitStatus = result; });
115
116 return waitStatus;
117 }
118
asyncWait(Ticket && ticket,TicketCallback callback)119 void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) {
120 auto ownedASIOTicket = std::shared_ptr<TicketImpl>(getOwnedTicketImpl(std::move(ticket)));
121 auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());
122
123 asioTicket->fill(
124 false,
125 [ callback = std::move(callback),
126 ownedASIOTicket = std::move(ownedASIOTicket) ](Status status) { callback(status); });
127 }
128
129 // Must not be called while holding the TransportLayerASIO mutex.
end(const SessionHandle & session)130 void TransportLayerASIO::end(const SessionHandle& session) {
131 auto asioSession = checked_pointer_cast<ASIOSession>(session);
132 asioSession->shutdown();
133 }
134
setup()135 Status TransportLayerASIO::setup() {
136 std::vector<std::string> listenAddrs;
137 if (_listenerOptions.ipList.empty()) {
138 listenAddrs = {"127.0.0.1"};
139 if (_listenerOptions.enableIPv6) {
140 listenAddrs.emplace_back("::1");
141 }
142 } else {
143 listenAddrs = _listenerOptions.ipList;
144 }
145
146 #ifndef _WIN32
147 if (_listenerOptions.useUnixSockets) {
148 listenAddrs.emplace_back(makeUnixSockPath(_listenerOptions.port));
149 }
150 #endif
151
152 std::set<SockAddr> boundAddrs;
153 for (auto& ip : listenAddrs) {
154 std::error_code ec;
155 if (ip.empty()) {
156 warning() << "Skipping empty bind address";
157 continue;
158 }
159
160 const auto addrs = SockAddr::createAll(
161 ip, _listenerOptions.port, _listenerOptions.enableIPv6 ? AF_UNSPEC : AF_INET);
162 if (addrs.empty()) {
163 warning() << "Found no addresses for " << ip;
164 continue;
165 }
166
167 for (const auto& addr : addrs) {
168 if (!boundAddrs.insert(addr).second) {
169 // SockAddr::createAll() can return multiple addresses for a single endpoint. This
170 // introduces the possibility that two different endpoints can return the same
171 // address. Hence, we filter our addresses for uniqueness.
172 continue;
173 }
174
175 asio::generic::stream_protocol::endpoint endpoint(addr.raw(), addr.addressSize);
176
177 #ifndef _WIN32
178 if (addr.getType() == AF_UNIX) {
179 if (::unlink(ip.c_str()) == -1 && errno != ENOENT) {
180 error() << "Failed to unlink socket file " << ip << " "
181 << errnoWithDescription(errno);
182 fassertFailedNoTrace(40486);
183 }
184 }
185 #endif
186 if (addr.getType() == AF_INET6 && !_listenerOptions.enableIPv6) {
187 error() << "Specified ipv6 bind address, but ipv6 is disabled";
188 fassertFailedNoTrace(40488);
189 }
190
191 GenericAcceptor acceptor(*_acceptorIOContext);
192 acceptor.open(endpoint.protocol());
193 acceptor.set_option(GenericAcceptor::reuse_address(true));
194 if (addr.getType() == AF_INET6) {
195 acceptor.set_option(asio::ip::v6_only(true));
196 }
197
198 acceptor.non_blocking(true, ec);
199 if (ec) {
200 return errorCodeToStatus(ec);
201 }
202
203 acceptor.bind(endpoint, ec);
204 if (ec) {
205 return errorCodeToStatus(ec);
206 }
207
208 #ifndef _WIN32
209 if (addr.getType() == AF_UNIX) {
210 if (::chmod(ip.c_str(), serverGlobalParams.unixSocketPermissions) == -1) {
211 error() << "Failed to chmod socket file " << ip << " "
212 << errnoWithDescription(errno);
213 fassertFailedNoTrace(40487);
214 }
215 }
216 #endif
217 _acceptors.emplace_back(std::make_pair(std::move(addr), std::move(acceptor)));
218 }
219 }
220
221 if (_acceptors.empty()) {
222 return Status(ErrorCodes::SocketException, "No available addresses/ports to bind to");
223 }
224
225 #ifdef MONGO_CONFIG_SSL
226 const auto& sslParams = getSSLGlobalParams();
227
228 if (_sslMode() != SSLParams::SSLMode_disabled) {
229 _sslContext = stdx::make_unique<asio::ssl::context>(asio::ssl::context::sslv23);
230
231 Status status =
232 getSSLManager()->initSSLContext(_sslContext->native_handle(),
233 sslParams,
234 SSLManagerInterface::ConnectionDirection::kIncoming);
235 if (!status.isOK()) {
236 return status;
237 }
238 }
239 #endif
240
241 return Status::OK();
242 }
243
start()244 Status TransportLayerASIO::start() {
245 stdx::lock_guard<stdx::mutex> lk(_mutex);
246 _running.store(true);
247
248 _listenerThread = stdx::thread([this] {
249 setThreadName("listener");
250 while (_running.load()) {
251 asio::io_context::work work(*_acceptorIOContext);
252 try {
253 _acceptorIOContext->run();
254 } catch (...) {
255 severe() << "Uncaught exception in the listener: " << exceptionToStatus();
256 fassertFailed(40491);
257 }
258 }
259 });
260
261 for (auto& acceptor : _acceptors) {
262 asio::error_code ec;
263 acceptor.second.listen(serverGlobalParams.listenBacklog, ec);
264 if (ec) {
265 severe() << "Error listening for new connections on " << acceptor.first << ": "
266 << ec.message();
267 fassertFailed(31339);
268 }
269 log() << "listening via socket bound to " << acceptor.first.getAddr();
270
271 _acceptConnection(acceptor.second);
272 }
273
274 const char* ssl = "";
275 #ifdef MONGO_CONFIG_SSL
276 if (_sslMode() != SSLParams::SSLMode_disabled) {
277 ssl = " ssl";
278 }
279 #endif
280 log() << "waiting for connections on port " << _listenerOptions.port << ssl;
281
282 return Status::OK();
283 }
284
shutdown()285 void TransportLayerASIO::shutdown() {
286 stdx::lock_guard<stdx::mutex> lk(_mutex);
287 _running.store(false);
288
289 // Loop through the acceptors and cancel their calls to async_accept. This will prevent new
290 // connections from being opened.
291 for (auto& acceptor : _acceptors) {
292 acceptor.second.cancel();
293 auto& addr = acceptor.first;
294 if (addr.getType() == AF_UNIX && !addr.isAnonymousUNIXSocket()) {
295 auto path = addr.getAddr();
296 log() << "removing socket file: " << path;
297 if (::unlink(path.c_str()) != 0) {
298 const auto ewd = errnoWithDescription();
299 warning() << "Unable to remove UNIX socket " << path << ": " << ewd;
300 }
301 }
302 }
303
304 // If the listener thread is joinable (that is, we created/started a listener thread), then
305 // the io_context is owned exclusively by the TransportLayer and we should stop it and join
306 // the listener thread.
307 //
308 // Otherwise the ServiceExecutor may need to continue running the io_context to drain running
309 // connections, so we just cancel the acceptors and return.
310 if (_listenerThread.joinable()) {
311 _acceptorIOContext->stop();
312 _listenerThread.join();
313 }
314 }
315
getIOContext()316 const std::shared_ptr<asio::io_context>& TransportLayerASIO::getIOContext() {
317 return _workerIOContext;
318 }
319
_acceptConnection(GenericAcceptor & acceptor)320 void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {
321 auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable {
322 if (!_running.load())
323 return;
324
325 if (ec) {
326 log() << "Error accepting new connection on "
327 << endpointToHostAndPort(acceptor.local_endpoint()) << ": " << ec.message();
328 _acceptConnection(acceptor);
329 return;
330 }
331
332 try {
333 std::shared_ptr<ASIOSession> session(new ASIOSession(this, std::move(peerSocket)));
334 _sep->startSession(std::move(session));
335 } catch (const DBException& e) {
336 warning() << "Error accepting new connection " << e;
337 }
338
339 _acceptConnection(acceptor);
340 };
341
342 acceptor.async_accept(*_workerIOContext, std::move(acceptCb));
343 }
344
345 #ifdef MONGO_CONFIG_SSL
_sslMode() const346 SSLParams::SSLModes TransportLayerASIO::_sslMode() const {
347 return static_cast<SSLParams::SSLModes>(getSSLGlobalParams().sslMode.load());
348 }
349 #endif
350
351 } // namespace transport
352 } // namespace mongo
353