1 /* 2 * TcpIpAsyncConnector.hpp 3 * 4 * Copyright (C) 2021 by RStudio, PBC 5 * 6 * Unless you have received this program directly from RStudio pursuant 7 * to the terms of a commercial license agreement with RStudio, then 8 * this program is licensed to you under the terms of version 3 of the 9 * GNU Affero General Public License. This program is distributed WITHOUT 10 * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT, 11 * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the 12 * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details. 13 * 14 */ 15 16 #ifndef CORE_HTTP_TCP_IP_ASYNC_CONNECTOR_HPP 17 #define CORE_HTTP_TCP_IP_ASYNC_CONNECTOR_HPP 18 19 #include <boost/function.hpp> 20 #include <boost/shared_ptr.hpp> 21 #include <boost/enable_shared_from_this.hpp> 22 23 #include <boost/asio/deadline_timer.hpp> 24 #include <boost/asio/ip/tcp.hpp> 25 #include <boost/asio/placeholders.hpp> 26 27 #include <core/http/TcpIpSocketUtils.hpp> 28 #include <core/Thread.hpp> 29 30 // special version of unexpected exception handler which makes 31 // sure to call the user's ErrorHandler 32 #define CATCH_UNEXPECTED_ASYNC_CONNECTOR_EXCEPTION \ 33 catch(const std::exception& e) \ 34 { \ 35 handleUnexpectedError(std::string("Unexpected exception: ") + \ 36 e.what(), ERROR_LOCATION); \ 37 } \ 38 catch(...) \ 39 { \ 40 handleUnexpectedError("Unknown exception", ERROR_LOCATION); \ 41 } 42 43 namespace rstudio { 44 namespace core { 45 namespace http { 46 47 class TcpIpAsyncConnector : 48 public boost::enable_shared_from_this<TcpIpAsyncConnector>, 49 boost::noncopyable 50 { 51 public: 52 typedef boost::function<void()> ConnectedHandler; 53 typedef boost::function<void(const core::Error&)> ErrorHandler; 54 55 public: TcpIpAsyncConnector(boost::asio::io_service & ioService,boost::asio::ip::tcp::socket * pSocket)56 TcpIpAsyncConnector(boost::asio::io_service& ioService, 57 boost::asio::ip::tcp::socket* pSocket) 58 : service_(ioService), 59 pSocket_(pSocket), 60 resolver_(ioService), 61 isConnected_(false), 62 hasFailed_(false) 63 { 64 } 65 66 public: connect(const std::string & address,const std::string & port,const ConnectedHandler & connectedHandler,const ErrorHandler & errorHandler,const boost::posix_time::time_duration & timeout=boost::posix_time::time_duration (boost::posix_time::pos_infin))67 void connect(const std::string& address, 68 const std::string& port, 69 const ConnectedHandler& connectedHandler, 70 const ErrorHandler& errorHandler, 71 const boost::posix_time::time_duration& timeout = 72 boost::posix_time::time_duration(boost::posix_time::pos_infin)) 73 { 74 // save handlers 75 connectedHandler_ = connectedHandler; 76 errorHandler_ = errorHandler; 77 78 if (!timeout.is_special()) 79 { 80 // start a timer that will cancel any outstanding asynchronous operations 81 // when it elapses if the connection operation has not succeeded 82 pConnectionTimer_.reset(new boost::asio::deadline_timer(service_, timeout)); 83 pConnectionTimer_->async_wait(boost::bind(&TcpIpAsyncConnector::onConnectionTimeout, 84 TcpIpAsyncConnector::shared_from_this(), 85 boost::asio::placeholders::error)); 86 } 87 88 // start an async resolve 89 boost::asio::ip::tcp::resolver::query query(address, port); 90 resolver_.async_resolve( 91 query, 92 boost::bind(&TcpIpAsyncConnector::handleResolve, 93 TcpIpAsyncConnector::shared_from_this(), 94 boost::asio::placeholders::error, 95 boost::asio::placeholders::iterator)); 96 } 97 98 private: 99 onConnectionTimeout(const boost::system::error_code & ec)100 void onConnectionTimeout(const boost::system::error_code& ec) 101 { 102 try 103 { 104 if (ec == boost::system::errc::operation_canceled) 105 return; 106 107 LOCK_MUTEX(mutex_) 108 { 109 if (isConnected_ || hasFailed_) 110 return; 111 112 // timer has elapsed and the socket is still not connected 113 // cancel any outstanding async operations 114 resolver_.cancel(); 115 pSocket_->cancel(); 116 117 // invoke error handler since the connection has failed 118 handleError(systemError(boost::system::errc::timed_out, ERROR_LOCATION)); 119 } 120 END_LOCK_MUTEX 121 } 122 CATCH_UNEXPECTED_ASYNC_CONNECTOR_EXCEPTION 123 } 124 handleResolve(const boost::system::error_code & ec,boost::asio::ip::tcp::resolver::iterator endpoint_iterator)125 void handleResolve( 126 const boost::system::error_code& ec, 127 boost::asio::ip::tcp::resolver::iterator endpoint_iterator) 128 { 129 if (ec == boost::asio::error::operation_aborted) 130 return; 131 132 LOCK_MUTEX(mutex_) 133 { 134 if (hasFailed_) 135 return; 136 137 try 138 { 139 if (!ec) 140 { 141 // work-around - in some rare instances, we've seen that Boost will still 142 // return us an empty endpoint_iterator, even when successful, which is 143 // contrary to the documentation 144 if (endpoint_iterator == boost::asio::ip::tcp::resolver::iterator()) 145 { 146 handleErrorCode(boost::system::error_code(boost::system::errc::io_error, 147 boost::system::system_category()), 148 ERROR_LOCATION); 149 return; 150 } 151 152 // try endpoints until we successfully connect with one 153 boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; 154 pSocket_->async_connect( 155 endpoint, 156 boost::bind(&TcpIpAsyncConnector::handleConnect, 157 TcpIpAsyncConnector::shared_from_this(), 158 boost::asio::placeholders::error, 159 ++endpoint_iterator)); 160 } 161 else 162 { 163 handleErrorCode(ec, ERROR_LOCATION); 164 } 165 } 166 CATCH_UNEXPECTED_ASYNC_CONNECTOR_EXCEPTION 167 } 168 END_LOCK_MUTEX 169 } 170 handleConnect(const boost::system::error_code & ec,boost::asio::ip::tcp::resolver::iterator endpoint_iterator)171 void handleConnect( 172 const boost::system::error_code& ec, 173 boost::asio::ip::tcp::resolver::iterator endpoint_iterator) 174 { 175 if (ec == boost::asio::error::operation_aborted) 176 return; 177 178 LOCK_MUTEX(mutex_) 179 { 180 if (hasFailed_) 181 return; 182 183 try 184 { 185 if (!ec) 186 { 187 isConnected_ = true; 188 189 if (pConnectionTimer_) 190 pConnectionTimer_->cancel(); 191 192 if (connectedHandler_) 193 connectedHandler_(); 194 } 195 else if (endpoint_iterator != 196 boost::asio::ip::tcp::resolver::iterator()) 197 { 198 // try next endpoint 199 pSocket_->close(); 200 boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; 201 pSocket_->async_connect( 202 endpoint, 203 boost::bind(&TcpIpAsyncConnector::handleConnect, 204 TcpIpAsyncConnector::shared_from_this(), 205 boost::asio::placeholders::error, 206 ++endpoint_iterator)); 207 } 208 else 209 { 210 handleErrorCode(ec, ERROR_LOCATION); 211 } 212 } 213 CATCH_UNEXPECTED_ASYNC_CONNECTOR_EXCEPTION 214 } 215 END_LOCK_MUTEX 216 } 217 handleError(const Error & error)218 void handleError(const Error& error) 219 { 220 hasFailed_ = true; 221 222 if (errorHandler_) 223 errorHandler_(error); 224 } 225 handleErrorCode(const boost::system::error_code & ec,const ErrorLocation & location)226 void handleErrorCode(const boost::system::error_code& ec, 227 const ErrorLocation& location) 228 { 229 if (pConnectionTimer_) 230 pConnectionTimer_->cancel(); 231 232 handleError(Error(ec, location)); 233 } 234 handleUnexpectedError(const std::string & description,const ErrorLocation & location)235 void handleUnexpectedError(const std::string& description, 236 const ErrorLocation& location) 237 { 238 if (pConnectionTimer_) 239 pConnectionTimer_->cancel(); 240 241 Error error = systemError(boost::system::errc::state_not_recoverable, 242 description, 243 location); 244 handleError(error); 245 } 246 247 private: 248 boost::asio::io_service& service_; 249 boost::asio::ip::tcp::socket* pSocket_; 250 boost::asio::ip::tcp::resolver resolver_; 251 ConnectedHandler connectedHandler_; 252 ErrorHandler errorHandler_; 253 254 bool isConnected_; 255 bool hasFailed_; 256 boost::mutex mutex_; 257 boost::shared_ptr<boost::asio::deadline_timer> pConnectionTimer_; 258 }; 259 260 } // namespace http 261 } // namespace core 262 } // namespace rstudio 263 264 #endif // CORE_HTTP_TCP_IP_ASYNC_CONNECTOR_HPP 265