1 /*
2     Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <new>
32 #include <string>
33 
34 #include "macros.hpp"
35 #include "ws_connecter.hpp"
36 #include "io_thread.hpp"
37 #include "err.hpp"
38 #include "ip.hpp"
39 #include "tcp.hpp"
40 #include "address.hpp"
41 #include "ws_address.hpp"
42 #include "ws_engine.hpp"
43 #include "session_base.hpp"
44 
45 #ifdef ZMQ_HAVE_WSS
46 #include "wss_engine.hpp"
47 #include "wss_address.hpp"
48 #endif
49 
50 #if !defined ZMQ_HAVE_WINDOWS
51 #include <unistd.h>
52 #include <sys/types.h>
53 #include <sys/socket.h>
54 #include <arpa/inet.h>
55 #include <netinet/tcp.h>
56 #include <netinet/in.h>
57 #include <netdb.h>
58 #include <fcntl.h>
59 #ifdef ZMQ_HAVE_VXWORKS
60 #include <sockLib.h>
61 #endif
62 #ifdef ZMQ_HAVE_OPENVMS
63 #include <ioctl.h>
64 #endif
65 #endif
66 
67 #ifdef __APPLE__
68 #include <TargetConditionals.h>
69 #endif
70 
ws_connecter_t(class io_thread_t * io_thread_,class session_base_t * session_,const options_t & options_,address_t * addr_,bool delayed_start_,bool wss_,const std::string & tls_hostname_)71 zmq::ws_connecter_t::ws_connecter_t (class io_thread_t *io_thread_,
72                                      class session_base_t *session_,
73                                      const options_t &options_,
74                                      address_t *addr_,
75                                      bool delayed_start_,
76                                      bool wss_,
77                                      const std::string &tls_hostname_) :
78     stream_connecter_base_t (
79       io_thread_, session_, options_, addr_, delayed_start_),
80     _connect_timer_started (false),
81     _wss (wss_),
82     _hostname (tls_hostname_)
83 {
84 }
85 
~ws_connecter_t()86 zmq::ws_connecter_t::~ws_connecter_t ()
87 {
88     zmq_assert (!_connect_timer_started);
89 }
90 
process_term(int linger_)91 void zmq::ws_connecter_t::process_term (int linger_)
92 {
93     if (_connect_timer_started) {
94         cancel_timer (connect_timer_id);
95         _connect_timer_started = false;
96     }
97 
98     stream_connecter_base_t::process_term (linger_);
99 }
100 
out_event()101 void zmq::ws_connecter_t::out_event ()
102 {
103     if (_connect_timer_started) {
104         cancel_timer (connect_timer_id);
105         _connect_timer_started = false;
106     }
107 
108     //  TODO this is still very similar to (t)ipc_connecter_t, maybe the
109     //  differences can be factored out
110 
111     rm_handle ();
112 
113     const fd_t fd = connect ();
114 
115     //  Handle the error condition by attempt to reconnect.
116     if (fd == retired_fd || !tune_socket (fd)) {
117         close ();
118         add_reconnect_timer ();
119         return;
120     }
121 
122     if (_wss)
123 #ifdef ZMQ_HAVE_WSS
124         create_engine (fd,
125                        get_socket_name<wss_address_t> (fd, socket_end_local));
126 #else
127         assert (false);
128 #endif
129     else
130         create_engine (fd,
131                        get_socket_name<ws_address_t> (fd, socket_end_local));
132 }
133 
timer_event(int id_)134 void zmq::ws_connecter_t::timer_event (int id_)
135 {
136     if (id_ == connect_timer_id) {
137         _connect_timer_started = false;
138         rm_handle ();
139         close ();
140         add_reconnect_timer ();
141     } else
142         stream_connecter_base_t::timer_event (id_);
143 }
144 
start_connecting()145 void zmq::ws_connecter_t::start_connecting ()
146 {
147     //  Open the connecting socket.
148     const int rc = open ();
149 
150     //  Connect may succeed in synchronous manner.
151     if (rc == 0) {
152         _handle = add_fd (_s);
153         out_event ();
154     }
155 
156     //  Connection establishment may be delayed. Poll for its completion.
157     else if (rc == -1 && errno == EINPROGRESS) {
158         _handle = add_fd (_s);
159         set_pollout (_handle);
160         _socket->event_connect_delayed (
161           make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
162 
163         //  add userspace connect timeout
164         add_connect_timer ();
165     }
166 
167     //  Handle any other error condition by eventual reconnect.
168     else {
169         if (_s != retired_fd)
170             close ();
171         add_reconnect_timer ();
172     }
173 }
174 
add_connect_timer()175 void zmq::ws_connecter_t::add_connect_timer ()
176 {
177     if (options.connect_timeout > 0) {
178         add_timer (options.connect_timeout, connect_timer_id);
179         _connect_timer_started = true;
180     }
181 }
182 
open()183 int zmq::ws_connecter_t::open ()
184 {
185     zmq_assert (_s == retired_fd);
186 
187     tcp_address_t tcp_addr;
188     _s = tcp_open_socket (_addr->address.c_str (), options, false, true,
189                           &tcp_addr);
190     if (_s == retired_fd)
191         return -1;
192 
193     // Set the socket to non-blocking mode so that we get async connect().
194     unblock_socket (_s);
195 
196     //  Connect to the remote peer.
197 #ifdef ZMQ_HAVE_VXWORKS
198     int rc = ::connect (_s, (sockaddr *) tcp_addr.addr (), tcp_addr.addrlen ());
199 #else
200     const int rc = ::connect (_s, tcp_addr.addr (), tcp_addr.addrlen ());
201 #endif
202     //  Connect was successful immediately.
203     if (rc == 0) {
204         return 0;
205     }
206 
207     //  Translate error codes indicating asynchronous connect has been
208     //  launched to a uniform EINPROGRESS.
209 #ifdef ZMQ_HAVE_WINDOWS
210     const int last_error = WSAGetLastError ();
211     if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
212         errno = EINPROGRESS;
213     else
214         errno = wsa_error_to_errno (last_error);
215 #else
216     if (errno == EINTR)
217         errno = EINPROGRESS;
218 #endif
219     return -1;
220 }
221 
connect()222 zmq::fd_t zmq::ws_connecter_t::connect ()
223 {
224     //  Async connect has finished. Check whether an error occurred
225     int err = 0;
226 #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
227     int len = sizeof err;
228 #else
229     socklen_t len = sizeof err;
230 #endif
231 
232     const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
233                                reinterpret_cast<char *> (&err), &len);
234 
235     //  Assert if the error was caused by 0MQ bug.
236     //  Networking problems are OK. No need to assert.
237 #ifdef ZMQ_HAVE_WINDOWS
238     zmq_assert (rc == 0);
239     if (err != 0) {
240         if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
241             || err == WSAENOBUFS) {
242             wsa_assert_no (err);
243         }
244         return retired_fd;
245     }
246 #else
247     //  Following code should handle both Berkeley-derived socket
248     //  implementations and Solaris.
249     if (rc == -1)
250         err = errno;
251     if (err != 0) {
252         errno = err;
253 #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
254         errno_assert (errno != EBADF && errno != ENOPROTOOPT
255                       && errno != ENOTSOCK && errno != ENOBUFS);
256 #else
257         errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
258                       && errno != ENOBUFS);
259 #endif
260         return retired_fd;
261     }
262 #endif
263 
264     //  Return the newly connected socket.
265     const fd_t result = _s;
266     _s = retired_fd;
267     return result;
268 }
269 
tune_socket(const fd_t fd_)270 bool zmq::ws_connecter_t::tune_socket (const fd_t fd_)
271 {
272     const int rc =
273       tune_tcp_socket (fd_) | tune_tcp_maxrt (fd_, options.tcp_maxrt);
274     return rc == 0;
275 }
276 
create_engine(fd_t fd_,const std::string & local_address_)277 void zmq::ws_connecter_t::create_engine (fd_t fd_,
278                                          const std::string &local_address_)
279 {
280     const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint,
281                                              endpoint_type_connect);
282 
283     //  Create the engine object for this connection.
284     i_engine *engine = NULL;
285     if (_wss) {
286 #ifdef ZMQ_HAVE_WSS
287         engine = new (std::nothrow)
288           wss_engine_t (fd_, options, endpoint_pair, *_addr->resolved.ws_addr,
289                         true, NULL, _hostname);
290 #else
291         LIBZMQ_UNUSED (_hostname);
292         assert (false);
293 #endif
294     } else
295         engine = new (std::nothrow) ws_engine_t (
296           fd_, options, endpoint_pair, *_addr->resolved.ws_addr, true);
297     alloc_assert (engine);
298 
299     //  Attach the engine to the corresponding session object.
300     send_attach (_session, engine);
301 
302     //  Shut the connecter down.
303     terminate ();
304 
305     _socket->event_connected (endpoint_pair, fd_);
306 }
307