1 /*
2  * TcpIpAsyncConnector.hpp
3  *
4  * Copyright (C) 2021 by RStudio, PBC
5  *
6  * Unless you have received this program directly from RStudio pursuant
7  * to the terms of a commercial license agreement with RStudio, then
8  * this program is licensed to you under the terms of version 3 of the
9  * GNU Affero General Public License. This program is distributed WITHOUT
10  * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT,
11  * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the
12  * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details.
13  *
14  */
15 
16 #ifndef CORE_HTTP_TCP_IP_ASYNC_CONNECTOR_HPP
17 #define CORE_HTTP_TCP_IP_ASYNC_CONNECTOR_HPP
18 
19 #include <boost/function.hpp>
20 #include <boost/shared_ptr.hpp>
21 #include <boost/enable_shared_from_this.hpp>
22 
23 #include <boost/asio/deadline_timer.hpp>
24 #include <boost/asio/ip/tcp.hpp>
25 #include <boost/asio/placeholders.hpp>
26 
27 #include <core/http/TcpIpSocketUtils.hpp>
28 #include <core/Thread.hpp>
29 
30 // special version of unexpected exception handler which makes
31 // sure to call the user's ErrorHandler
32 #define CATCH_UNEXPECTED_ASYNC_CONNECTOR_EXCEPTION \
33    catch(const std::exception& e) \
34    { \
35       handleUnexpectedError(std::string("Unexpected exception: ") + \
36                             e.what(), ERROR_LOCATION);  \
37    } \
38    catch(...) \
39    { \
40       handleUnexpectedError("Unknown exception", ERROR_LOCATION); \
41    }
42 
43 namespace rstudio {
44 namespace core {
45 namespace http {
46 
47 class TcpIpAsyncConnector :
48       public boost::enable_shared_from_this<TcpIpAsyncConnector>,
49       boost::noncopyable
50 {
51 public:
52    typedef boost::function<void()> ConnectedHandler;
53    typedef boost::function<void(const core::Error&)> ErrorHandler;
54 
55 public:
TcpIpAsyncConnector(boost::asio::io_service & ioService,boost::asio::ip::tcp::socket * pSocket)56    TcpIpAsyncConnector(boost::asio::io_service& ioService,
57                        boost::asio::ip::tcp::socket* pSocket)
58      : service_(ioService),
59        pSocket_(pSocket),
60        resolver_(ioService),
61        isConnected_(false),
62        hasFailed_(false)
63    {
64    }
65 
66 public:
connect(const std::string & address,const std::string & port,const ConnectedHandler & connectedHandler,const ErrorHandler & errorHandler,const boost::posix_time::time_duration & timeout=boost::posix_time::time_duration (boost::posix_time::pos_infin))67    void connect(const std::string& address,
68                 const std::string& port,
69                 const ConnectedHandler& connectedHandler,
70                 const ErrorHandler& errorHandler,
71                 const boost::posix_time::time_duration& timeout =
72                    boost::posix_time::time_duration(boost::posix_time::pos_infin))
73    {
74       // save handlers
75       connectedHandler_ = connectedHandler;
76       errorHandler_ = errorHandler;
77 
78       if (!timeout.is_special())
79       {
80          // start a timer that will cancel any outstanding asynchronous operations
81          // when it elapses if the connection operation has not succeeded
82          pConnectionTimer_.reset(new boost::asio::deadline_timer(service_, timeout));
83          pConnectionTimer_->async_wait(boost::bind(&TcpIpAsyncConnector::onConnectionTimeout,
84                                                    TcpIpAsyncConnector::shared_from_this(),
85                                                    boost::asio::placeholders::error));
86       }
87 
88       // start an async resolve
89       boost::asio::ip::tcp::resolver::query query(address, port);
90       resolver_.async_resolve(
91             query,
92             boost::bind(&TcpIpAsyncConnector::handleResolve,
93                         TcpIpAsyncConnector::shared_from_this(),
94                         boost::asio::placeholders::error,
95                         boost::asio::placeholders::iterator));
96    }
97 
98 private:
99 
onConnectionTimeout(const boost::system::error_code & ec)100    void onConnectionTimeout(const boost::system::error_code& ec)
101    {
102       try
103       {
104          if (ec == boost::system::errc::operation_canceled)
105             return;
106 
107          LOCK_MUTEX(mutex_)
108          {
109             if (isConnected_ || hasFailed_)
110                return;
111 
112             // timer has elapsed and the socket is still not connected
113             // cancel any outstanding async operations
114             resolver_.cancel();
115             pSocket_->cancel();
116 
117             // invoke error handler since the connection has failed
118             handleError(systemError(boost::system::errc::timed_out, ERROR_LOCATION));
119          }
120          END_LOCK_MUTEX
121       }
122       CATCH_UNEXPECTED_ASYNC_CONNECTOR_EXCEPTION
123    }
124 
handleResolve(const boost::system::error_code & ec,boost::asio::ip::tcp::resolver::iterator endpoint_iterator)125    void handleResolve(
126          const boost::system::error_code& ec,
127          boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
128    {
129       if (ec == boost::asio::error::operation_aborted)
130          return;
131 
132       LOCK_MUTEX(mutex_)
133       {
134          if (hasFailed_)
135             return;
136 
137          try
138          {
139             if (!ec)
140             {
141                // work-around - in some rare instances, we've seen that Boost will still
142                // return us an empty endpoint_iterator, even when successful, which is
143                // contrary to the documentation
144                if (endpoint_iterator == boost::asio::ip::tcp::resolver::iterator())
145                {
146                   handleErrorCode(boost::system::error_code(boost::system::errc::io_error,
147                                                             boost::system::system_category()),
148                                   ERROR_LOCATION);
149                   return;
150                }
151 
152                // try endpoints until we successfully connect with one
153                boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
154                pSocket_->async_connect(
155                   endpoint,
156                   boost::bind(&TcpIpAsyncConnector::handleConnect,
157                               TcpIpAsyncConnector::shared_from_this(),
158                               boost::asio::placeholders::error,
159                               ++endpoint_iterator));
160             }
161             else
162             {
163                handleErrorCode(ec, ERROR_LOCATION);
164             }
165          }
166          CATCH_UNEXPECTED_ASYNC_CONNECTOR_EXCEPTION
167       }
168       END_LOCK_MUTEX
169    }
170 
handleConnect(const boost::system::error_code & ec,boost::asio::ip::tcp::resolver::iterator endpoint_iterator)171    void handleConnect(
172          const boost::system::error_code& ec,
173          boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
174    {
175       if (ec == boost::asio::error::operation_aborted)
176          return;
177 
178       LOCK_MUTEX(mutex_)
179       {
180          if (hasFailed_)
181             return;
182 
183          try
184          {
185             if (!ec)
186             {
187                isConnected_ = true;
188 
189                if (pConnectionTimer_)
190                   pConnectionTimer_->cancel();
191 
192                if (connectedHandler_)
193                   connectedHandler_();
194             }
195             else if (endpoint_iterator !=
196                      boost::asio::ip::tcp::resolver::iterator())
197             {
198                // try next endpoint
199                pSocket_->close();
200                boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
201                pSocket_->async_connect(
202                   endpoint,
203                   boost::bind(&TcpIpAsyncConnector::handleConnect,
204                               TcpIpAsyncConnector::shared_from_this(),
205                               boost::asio::placeholders::error,
206                               ++endpoint_iterator));
207             }
208             else
209             {
210                handleErrorCode(ec, ERROR_LOCATION);
211             }
212          }
213          CATCH_UNEXPECTED_ASYNC_CONNECTOR_EXCEPTION
214       }
215       END_LOCK_MUTEX
216    }
217 
handleError(const Error & error)218    void handleError(const Error& error)
219    {
220       hasFailed_ = true;
221 
222       if (errorHandler_)
223          errorHandler_(error);
224    }
225 
handleErrorCode(const boost::system::error_code & ec,const ErrorLocation & location)226    void handleErrorCode(const boost::system::error_code& ec,
227                         const ErrorLocation& location)
228    {
229       if (pConnectionTimer_)
230          pConnectionTimer_->cancel();
231 
232       handleError(Error(ec, location));
233    }
234 
handleUnexpectedError(const std::string & description,const ErrorLocation & location)235    void handleUnexpectedError(const std::string& description,
236                               const ErrorLocation& location)
237    {
238       if (pConnectionTimer_)
239          pConnectionTimer_->cancel();
240 
241       Error error = systemError(boost::system::errc::state_not_recoverable,
242                                 description,
243                                 location);
244       handleError(error);
245    }
246 
247 private:
248    boost::asio::io_service& service_;
249    boost::asio::ip::tcp::socket* pSocket_;
250    boost::asio::ip::tcp::resolver resolver_;
251    ConnectedHandler connectedHandler_;
252    ErrorHandler errorHandler_;
253 
254    bool isConnected_;
255    bool hasFailed_;
256    boost::mutex mutex_;
257    boost::shared_ptr<boost::asio::deadline_timer> pConnectionTimer_;
258 };
259 
260 } // namespace http
261 } // namespace core
262 } // namespace rstudio
263 
264 #endif // CORE_HTTP_TCP_IP_ASYNC_CONNECTOR_HPP
265