1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <new>
32 #include <string>
33 
34 #include "macros.hpp"
35 #include "socks_connecter.hpp"
36 #include "random.hpp"
37 #include "err.hpp"
38 #include "ip.hpp"
39 #include "tcp.hpp"
40 #include "address.hpp"
41 #include "tcp_address.hpp"
42 #include "session_base.hpp"
43 #include "socks.hpp"
44 
45 #ifndef ZMQ_HAVE_WINDOWS
46 #include <unistd.h>
47 #include <sys/types.h>
48 #include <sys/socket.h>
49 #if defined ZMQ_HAVE_VXWORKS
50 #include <sockLib.h>
51 #endif
52 #endif
53 
socks_connecter_t(class io_thread_t * io_thread_,class session_base_t * session_,const options_t & options_,address_t * addr_,address_t * proxy_addr_,bool delayed_start_)54 zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
55                                            class session_base_t *session_,
56                                            const options_t &options_,
57                                            address_t *addr_,
58                                            address_t *proxy_addr_,
59                                            bool delayed_start_) :
60     stream_connecter_base_t (
61       io_thread_, session_, options_, addr_, delayed_start_),
62     _proxy_addr (proxy_addr_),
63     _auth_method (socks_no_auth_required),
64     _status (unplugged)
65 {
66     zmq_assert (_addr->protocol == protocol_name::tcp);
67     _proxy_addr->to_string (_endpoint);
68 }
69 
~socks_connecter_t()70 zmq::socks_connecter_t::~socks_connecter_t ()
71 {
72     LIBZMQ_DELETE (_proxy_addr);
73 }
74 
set_auth_method_none()75 void zmq::socks_connecter_t::set_auth_method_none ()
76 {
77     _auth_method = socks_no_auth_required;
78     _auth_username.clear ();
79     _auth_password.clear ();
80 }
81 
set_auth_method_basic(const std::string & username_,const std::string & password_)82 void zmq::socks_connecter_t::set_auth_method_basic (
83   const std::string &username_, const std::string &password_)
84 {
85     _auth_method = socks_basic_auth;
86     _auth_username = username_;
87     _auth_password = password_;
88 }
89 
in_event()90 void zmq::socks_connecter_t::in_event ()
91 {
92     int expected_status = -1;
93     zmq_assert (_status != unplugged);
94 
95     if (_status == waiting_for_choice) {
96         int rc = _choice_decoder.input (_s);
97         if (rc == 0 || rc == -1)
98             error ();
99         else if (_choice_decoder.message_ready ()) {
100             const socks_choice_t choice = _choice_decoder.decode ();
101             rc = process_server_response (choice);
102             if (rc == -1)
103                 error ();
104             else {
105                 if (choice.method == socks_basic_auth)
106                     expected_status = sending_basic_auth_request;
107                 else
108                     expected_status = sending_request;
109             }
110         }
111     } else if (_status == waiting_for_auth_response) {
112         int rc = _auth_response_decoder.input (_s);
113         if (rc == 0 || rc == -1)
114             error ();
115         else if (_auth_response_decoder.message_ready ()) {
116             const socks_auth_response_t auth_response =
117               _auth_response_decoder.decode ();
118             rc = process_server_response (auth_response);
119             if (rc == -1)
120                 error ();
121             else {
122                 expected_status = sending_request;
123             }
124         }
125     } else if (_status == waiting_for_response) {
126         int rc = _response_decoder.input (_s);
127         if (rc == 0 || rc == -1)
128             error ();
129         else if (_response_decoder.message_ready ()) {
130             const socks_response_t response = _response_decoder.decode ();
131             rc = process_server_response (response);
132             if (rc == -1)
133                 error ();
134             else {
135                 rm_handle ();
136                 create_engine (
137                   _s, get_socket_name<tcp_address_t> (_s, socket_end_local));
138                 _s = -1;
139                 _status = unplugged;
140             }
141         }
142     } else
143         error ();
144 
145     if (expected_status == sending_basic_auth_request) {
146         _basic_auth_request_encoder.encode (
147           socks_basic_auth_request_t (_auth_username, _auth_password));
148         reset_pollin (_handle);
149         set_pollout (_handle);
150         _status = sending_basic_auth_request;
151     } else if (expected_status == sending_request) {
152         std::string hostname;
153         uint16_t port = 0;
154         if (parse_address (_addr->address, hostname, port) == -1)
155             error ();
156         else {
157             _request_encoder.encode (socks_request_t (1, hostname, port));
158             reset_pollin (_handle);
159             set_pollout (_handle);
160             _status = sending_request;
161         }
162     }
163 }
164 
out_event()165 void zmq::socks_connecter_t::out_event ()
166 {
167     zmq_assert (
168       _status == waiting_for_proxy_connection || _status == sending_greeting
169       || _status == sending_basic_auth_request || _status == sending_request);
170 
171     if (_status == waiting_for_proxy_connection) {
172         const int rc = static_cast<int> (check_proxy_connection ());
173         if (rc == -1)
174             error ();
175         else {
176             _greeting_encoder.encode (socks_greeting_t (_auth_method));
177             _status = sending_greeting;
178         }
179     } else if (_status == sending_greeting) {
180         zmq_assert (_greeting_encoder.has_pending_data ());
181         const int rc = _greeting_encoder.output (_s);
182         if (rc == -1 || rc == 0)
183             error ();
184         else if (!_greeting_encoder.has_pending_data ()) {
185             reset_pollout (_handle);
186             set_pollin (_handle);
187             _status = waiting_for_choice;
188         }
189     } else if (_status == sending_basic_auth_request) {
190         zmq_assert (_basic_auth_request_encoder.has_pending_data ());
191         const int rc = _basic_auth_request_encoder.output (_s);
192         if (rc == -1 || rc == 0)
193             error ();
194         else if (!_basic_auth_request_encoder.has_pending_data ()) {
195             reset_pollout (_handle);
196             set_pollin (_handle);
197             _status = waiting_for_auth_response;
198         }
199     } else {
200         zmq_assert (_request_encoder.has_pending_data ());
201         const int rc = _request_encoder.output (_s);
202         if (rc == -1 || rc == 0)
203             error ();
204         else if (!_request_encoder.has_pending_data ()) {
205             reset_pollout (_handle);
206             set_pollin (_handle);
207             _status = waiting_for_response;
208         }
209     }
210 }
211 
start_connecting()212 void zmq::socks_connecter_t::start_connecting ()
213 {
214     zmq_assert (_status == unplugged);
215 
216     //  Open the connecting socket.
217     const int rc = connect_to_proxy ();
218 
219     //  Connect may succeed in synchronous manner.
220     if (rc == 0) {
221         _handle = add_fd (_s);
222         set_pollout (_handle);
223         _status = sending_greeting;
224     }
225     //  Connection establishment may be delayed. Poll for its completion.
226     else if (errno == EINPROGRESS) {
227         _handle = add_fd (_s);
228         set_pollout (_handle);
229         _status = waiting_for_proxy_connection;
230         _socket->event_connect_delayed (
231           make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
232     }
233     //  Handle any other error condition by eventual reconnect.
234     else {
235         if (_s != retired_fd)
236             close ();
237         add_reconnect_timer ();
238     }
239 }
240 
process_server_response(const socks_choice_t & response_)241 int zmq::socks_connecter_t::process_server_response (
242   const socks_choice_t &response_)
243 {
244     return response_.method == socks_no_auth_required
245                || response_.method == socks_basic_auth
246              ? 0
247              : -1;
248 }
249 
process_server_response(const socks_response_t & response_)250 int zmq::socks_connecter_t::process_server_response (
251   const socks_response_t &response_)
252 {
253     return response_.response_code == 0 ? 0 : -1;
254 }
255 
process_server_response(const socks_auth_response_t & response_)256 int zmq::socks_connecter_t::process_server_response (
257   const socks_auth_response_t &response_)
258 {
259     return response_.response_code == 0 ? 0 : -1;
260 }
261 
error()262 void zmq::socks_connecter_t::error ()
263 {
264     rm_fd (_handle);
265     close ();
266     _greeting_encoder.reset ();
267     _choice_decoder.reset ();
268     _basic_auth_request_encoder.reset ();
269     _auth_response_decoder.reset ();
270     _request_encoder.reset ();
271     _response_decoder.reset ();
272     _status = unplugged;
273     add_reconnect_timer ();
274 }
275 
connect_to_proxy()276 int zmq::socks_connecter_t::connect_to_proxy ()
277 {
278     zmq_assert (_s == retired_fd);
279 
280     //  Resolve the address
281     if (_proxy_addr->resolved.tcp_addr != NULL) {
282         LIBZMQ_DELETE (_proxy_addr->resolved.tcp_addr);
283     }
284 
285     _proxy_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
286     alloc_assert (_proxy_addr->resolved.tcp_addr);
287     //  Automatic fallback to ipv4 is disabled here since this was the existing
288     //  behaviour, however I don't see a real reason for this. Maybe this can
289     //  be changed to true (and then the parameter can be removed entirely).
290     _s = tcp_open_socket (_proxy_addr->address.c_str (), options, false, false,
291                           _proxy_addr->resolved.tcp_addr);
292     if (_s == retired_fd) {
293         //  TODO we should emit some event in this case!
294         LIBZMQ_DELETE (_proxy_addr->resolved.tcp_addr);
295         return -1;
296     }
297     zmq_assert (_proxy_addr->resolved.tcp_addr != NULL);
298 
299     // Set the socket to non-blocking mode so that we get async connect().
300     unblock_socket (_s);
301 
302     const tcp_address_t *const tcp_addr = _proxy_addr->resolved.tcp_addr;
303 
304     int rc;
305 
306     // Set a source address for conversations
307     if (tcp_addr->has_src_addr ()) {
308 #if defined ZMQ_HAVE_VXWORKS
309         rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (),
310                      tcp_addr->src_addrlen ());
311 #else
312         rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
313 #endif
314         if (rc == -1) {
315             close ();
316             return -1;
317         }
318     }
319 
320     //  Connect to the remote peer.
321 #if defined ZMQ_HAVE_VXWORKS
322     rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
323 #else
324     rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ());
325 #endif
326     //  Connect was successful immediately.
327     if (rc == 0)
328         return 0;
329 
330         //  Translate error codes indicating asynchronous connect has been
331         //  launched to a uniform EINPROGRESS.
332 #ifdef ZMQ_HAVE_WINDOWS
333     const int last_error = WSAGetLastError ();
334     if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
335         errno = EINPROGRESS;
336     else {
337         errno = wsa_error_to_errno (last_error);
338         close ();
339     }
340 #else
341     if (errno == EINTR)
342         errno = EINPROGRESS;
343 #endif
344     return -1;
345 }
346 
check_proxy_connection() const347 zmq::fd_t zmq::socks_connecter_t::check_proxy_connection () const
348 {
349     //  Async connect has finished. Check whether an error occurred
350     int err = 0;
351 #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
352     int len = sizeof err;
353 #else
354     socklen_t len = sizeof err;
355 #endif
356 
357     int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
358                          reinterpret_cast<char *> (&err), &len);
359 
360     //  Assert if the error was caused by 0MQ bug.
361     //  Networking problems are OK. No need to assert.
362 #ifdef ZMQ_HAVE_WINDOWS
363     zmq_assert (rc == 0);
364     if (err != 0) {
365         wsa_assert (err == WSAECONNREFUSED || err == WSAETIMEDOUT
366                     || err == WSAECONNABORTED || err == WSAEHOSTUNREACH
367                     || err == WSAENETUNREACH || err == WSAENETDOWN
368                     || err == WSAEACCES || err == WSAEINVAL
369                     || err == WSAEADDRINUSE);
370         return -1;
371     }
372 #else
373     //  Following code should handle both Berkeley-derived socket
374     //  implementations and Solaris.
375     if (rc == -1)
376         err = errno;
377     if (err != 0) {
378         errno = err;
379         errno_assert (errno == ECONNREFUSED || errno == ECONNRESET
380                       || errno == ETIMEDOUT || errno == EHOSTUNREACH
381                       || errno == ENETUNREACH || errno == ENETDOWN
382                       || errno == EINVAL);
383         return -1;
384     }
385 #endif
386 
387     rc = tune_tcp_socket (_s);
388     rc = rc
389          | tune_tcp_keepalives (
390            _s, options.tcp_keepalive, options.tcp_keepalive_cnt,
391            options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
392     if (rc != 0)
393         return -1;
394 
395     return 0;
396 }
397 
parse_address(const std::string & address_,std::string & hostname_,uint16_t & port_)398 int zmq::socks_connecter_t::parse_address (const std::string &address_,
399                                            std::string &hostname_,
400                                            uint16_t &port_)
401 {
402     //  Find the ':' at end that separates address from the port number.
403     const size_t idx = address_.rfind (':');
404     if (idx == std::string::npos) {
405         errno = EINVAL;
406         return -1;
407     }
408 
409     //  Extract hostname
410     if (idx < 2 || address_[0] != '[' || address_[idx - 1] != ']')
411         hostname_ = address_.substr (0, idx);
412     else
413         hostname_ = address_.substr (1, idx - 2);
414 
415     //  Separate the hostname/port.
416     const std::string port_str = address_.substr (idx + 1);
417     //  Parse the port number (0 is not a valid port).
418     port_ = static_cast<uint16_t> (atoi (port_str.c_str ()));
419     if (port_ == 0) {
420         errno = EINVAL;
421         return -1;
422     }
423     return 0;
424 }
425