1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include <asio.hpp>
32 #include <system_error>
33 #include <utility>
34
35 #include "mongo/util/assert_util.h"
36 #include "mongo/util/net/sock.h"
37
38 namespace mongo {
39 namespace executor {
40
41 void logCloseFailed(std::error_code ec);
42
43 template <typename ASIOStream>
destroyStream(ASIOStream * stream,bool connected)44 void destroyStream(ASIOStream* stream, bool connected) {
45 if (!connected) {
46 return;
47 }
48
49 std::error_code ec;
50
51 stream->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
52 if (ec) {
53 logCloseFailed(ec);
54 }
55
56 stream->close(ec);
57 if (ec) {
58 logCloseFailed(ec);
59 }
60 }
61
62 template <typename ASIOStream, typename Buffer, typename Handler>
writeStream(ASIOStream * stream,asio::io_service::strand * strand,bool connected,Buffer && buffer,Handler && handler)63 void writeStream(ASIOStream* stream,
64 asio::io_service::strand* strand,
65 bool connected,
66 Buffer&& buffer,
67 Handler&& handler) {
68 invariant(connected);
69 asio::async_write(*stream,
70 asio::buffer(std::forward<Buffer>(buffer)),
71 strand->wrap(std::forward<Handler>(handler)));
72 }
73
74 template <typename ASIOStream, typename Buffer, typename Handler>
readStream(ASIOStream * stream,asio::io_service::strand * strand,bool connected,Buffer && buffer,Handler && handler)75 void readStream(ASIOStream* stream,
76 asio::io_service::strand* strand,
77 bool connected,
78 Buffer&& buffer,
79 Handler&& handler) {
80 invariant(connected);
81 asio::async_read(*stream,
82 asio::buffer(std::forward<Buffer>(buffer)),
83 strand->wrap(std::forward<Handler>(handler)));
84 }
85
86 void logCancelFailed(std::error_code ec);
87
88 template <typename ASIOStream>
cancelStream(ASIOStream * stream)89 void cancelStream(ASIOStream* stream) {
90 std::error_code ec;
91 stream->cancel(ec);
92 if (ec) {
93 logCancelFailed(ec);
94 }
95 }
96
97 void logFailureInSetStreamNonBlocking(std::error_code ec);
98 void logFailureInSetStreamNoDelay(std::error_code ec);
99 void logFailureInSetStreamKeepAlive(std::error_code ec);
100
101 template <typename ASIOStream>
setStreamNonBlocking(ASIOStream * stream)102 std::error_code setStreamNonBlocking(ASIOStream* stream) {
103 std::error_code ec;
104 stream->non_blocking(true, ec);
105 if (ec) {
106 logFailureInSetStreamNonBlocking(ec);
107 }
108 return ec;
109 }
110
111 template <typename ASIOStream>
setStreamNoDelay(ASIOStream * stream)112 std::error_code setStreamNoDelay(ASIOStream* stream) {
113 std::error_code ec;
114 stream->set_option(asio::ip::tcp::no_delay(true), ec);
115 if (ec) {
116 logFailureInSetStreamNoDelay(ec);
117 }
118 return ec;
119 }
120
121 template <typename ASIOStream>
setStreamKeepAlive(ASIOStream * stream)122 std::error_code setStreamKeepAlive(ASIOStream* stream) {
123 std::error_code ec;
124 stream->set_option(asio::socket_base::keep_alive(true), ec);
125 if (ec) {
126 logFailureInSetStreamKeepAlive(ec);
127 }
128 setSocketKeepAliveParams(stream->native_handle());
129 return ec;
130 }
131
132 void logUnexpectedErrorInCheckOpen(std::error_code ec);
133
134 template <typename ASIOStream>
checkIfStreamIsOpen(ASIOStream * stream,bool connected)135 bool checkIfStreamIsOpen(ASIOStream* stream, bool connected) {
136 if (!connected) {
137 return false;
138 };
139 std::error_code ec;
140 std::array<char, 1> buf;
141 // Although we call the blocking form of receive, we ensure the socket is in non-blocking mode.
142 // ASIO implements receive on POSIX using the 'recvmsg' system call, which returns immediately
143 // if the socket is non-blocking and in a valid state, but there is no data to receive. On
144 // windows, receive is implemented with WSARecv, which has the same semantics.
145 invariant(stream->non_blocking());
146 stream->receive(asio::buffer(buf), asio::socket_base::message_peek, ec);
147 if (!ec || ec == asio::error::would_block || ec == asio::error::try_again) {
148 // If the read worked or we got EWOULDBLOCK or EAGAIN (since we are in non-blocking mode),
149 // we assume the socket is still open.
150 return true;
151 } else if (ec == asio::error::eof || ec == asio::error::connection_reset ||
152 ec == asio::error::network_reset) {
153 return false;
154 }
155 // We got a different error. Log it and return false so we throw the connection away.
156 logUnexpectedErrorInCheckOpen(ec);
157 return false;
158 }
159
160 } // namespace executor
161 } // namespace mongo
162