1 /*
2 Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "precompiled.hpp"
31 #include <new>
32 #include <string>
33
34 #include "macros.hpp"
35 #include "ws_connecter.hpp"
36 #include "io_thread.hpp"
37 #include "err.hpp"
38 #include "ip.hpp"
39 #include "tcp.hpp"
40 #include "address.hpp"
41 #include "ws_address.hpp"
42 #include "ws_engine.hpp"
43 #include "session_base.hpp"
44
45 #ifdef ZMQ_HAVE_WSS
46 #include "wss_engine.hpp"
47 #include "wss_address.hpp"
48 #endif
49
50 #if !defined ZMQ_HAVE_WINDOWS
51 #include <unistd.h>
52 #include <sys/types.h>
53 #include <sys/socket.h>
54 #include <arpa/inet.h>
55 #include <netinet/tcp.h>
56 #include <netinet/in.h>
57 #include <netdb.h>
58 #include <fcntl.h>
59 #ifdef ZMQ_HAVE_VXWORKS
60 #include <sockLib.h>
61 #endif
62 #ifdef ZMQ_HAVE_OPENVMS
63 #include <ioctl.h>
64 #endif
65 #endif
66
67 #ifdef __APPLE__
68 #include <TargetConditionals.h>
69 #endif
70
ws_connecter_t(class io_thread_t * io_thread_,class session_base_t * session_,const options_t & options_,address_t * addr_,bool delayed_start_,bool wss_,const std::string & tls_hostname_)71 zmq::ws_connecter_t::ws_connecter_t (class io_thread_t *io_thread_,
72 class session_base_t *session_,
73 const options_t &options_,
74 address_t *addr_,
75 bool delayed_start_,
76 bool wss_,
77 const std::string &tls_hostname_) :
78 stream_connecter_base_t (
79 io_thread_, session_, options_, addr_, delayed_start_),
80 _connect_timer_started (false),
81 _wss (wss_),
82 _hostname (tls_hostname_)
83 {
84 }
85
~ws_connecter_t()86 zmq::ws_connecter_t::~ws_connecter_t ()
87 {
88 zmq_assert (!_connect_timer_started);
89 }
90
process_term(int linger_)91 void zmq::ws_connecter_t::process_term (int linger_)
92 {
93 if (_connect_timer_started) {
94 cancel_timer (connect_timer_id);
95 _connect_timer_started = false;
96 }
97
98 stream_connecter_base_t::process_term (linger_);
99 }
100
out_event()101 void zmq::ws_connecter_t::out_event ()
102 {
103 if (_connect_timer_started) {
104 cancel_timer (connect_timer_id);
105 _connect_timer_started = false;
106 }
107
108 // TODO this is still very similar to (t)ipc_connecter_t, maybe the
109 // differences can be factored out
110
111 rm_handle ();
112
113 const fd_t fd = connect ();
114
115 // Handle the error condition by attempt to reconnect.
116 if (fd == retired_fd || !tune_socket (fd)) {
117 close ();
118 add_reconnect_timer ();
119 return;
120 }
121
122 if (_wss)
123 #ifdef ZMQ_HAVE_WSS
124 create_engine (fd,
125 get_socket_name<wss_address_t> (fd, socket_end_local));
126 #else
127 assert (false);
128 #endif
129 else
130 create_engine (fd,
131 get_socket_name<ws_address_t> (fd, socket_end_local));
132 }
133
timer_event(int id_)134 void zmq::ws_connecter_t::timer_event (int id_)
135 {
136 if (id_ == connect_timer_id) {
137 _connect_timer_started = false;
138 rm_handle ();
139 close ();
140 add_reconnect_timer ();
141 } else
142 stream_connecter_base_t::timer_event (id_);
143 }
144
start_connecting()145 void zmq::ws_connecter_t::start_connecting ()
146 {
147 // Open the connecting socket.
148 const int rc = open ();
149
150 // Connect may succeed in synchronous manner.
151 if (rc == 0) {
152 _handle = add_fd (_s);
153 out_event ();
154 }
155
156 // Connection establishment may be delayed. Poll for its completion.
157 else if (rc == -1 && errno == EINPROGRESS) {
158 _handle = add_fd (_s);
159 set_pollout (_handle);
160 _socket->event_connect_delayed (
161 make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
162
163 // add userspace connect timeout
164 add_connect_timer ();
165 }
166
167 // Handle any other error condition by eventual reconnect.
168 else {
169 if (_s != retired_fd)
170 close ();
171 add_reconnect_timer ();
172 }
173 }
174
add_connect_timer()175 void zmq::ws_connecter_t::add_connect_timer ()
176 {
177 if (options.connect_timeout > 0) {
178 add_timer (options.connect_timeout, connect_timer_id);
179 _connect_timer_started = true;
180 }
181 }
182
open()183 int zmq::ws_connecter_t::open ()
184 {
185 zmq_assert (_s == retired_fd);
186
187 tcp_address_t tcp_addr;
188 _s = tcp_open_socket (_addr->address.c_str (), options, false, true,
189 &tcp_addr);
190 if (_s == retired_fd)
191 return -1;
192
193 // Set the socket to non-blocking mode so that we get async connect().
194 unblock_socket (_s);
195
196 // Connect to the remote peer.
197 #ifdef ZMQ_HAVE_VXWORKS
198 int rc = ::connect (_s, (sockaddr *) tcp_addr.addr (), tcp_addr.addrlen ());
199 #else
200 const int rc = ::connect (_s, tcp_addr.addr (), tcp_addr.addrlen ());
201 #endif
202 // Connect was successful immediately.
203 if (rc == 0) {
204 return 0;
205 }
206
207 // Translate error codes indicating asynchronous connect has been
208 // launched to a uniform EINPROGRESS.
209 #ifdef ZMQ_HAVE_WINDOWS
210 const int last_error = WSAGetLastError ();
211 if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
212 errno = EINPROGRESS;
213 else
214 errno = wsa_error_to_errno (last_error);
215 #else
216 if (errno == EINTR)
217 errno = EINPROGRESS;
218 #endif
219 return -1;
220 }
221
connect()222 zmq::fd_t zmq::ws_connecter_t::connect ()
223 {
224 // Async connect has finished. Check whether an error occurred
225 int err = 0;
226 #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
227 int len = sizeof err;
228 #else
229 socklen_t len = sizeof err;
230 #endif
231
232 const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
233 reinterpret_cast<char *> (&err), &len);
234
235 // Assert if the error was caused by 0MQ bug.
236 // Networking problems are OK. No need to assert.
237 #ifdef ZMQ_HAVE_WINDOWS
238 zmq_assert (rc == 0);
239 if (err != 0) {
240 if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
241 || err == WSAENOBUFS) {
242 wsa_assert_no (err);
243 }
244 return retired_fd;
245 }
246 #else
247 // Following code should handle both Berkeley-derived socket
248 // implementations and Solaris.
249 if (rc == -1)
250 err = errno;
251 if (err != 0) {
252 errno = err;
253 #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
254 errno_assert (errno != EBADF && errno != ENOPROTOOPT
255 && errno != ENOTSOCK && errno != ENOBUFS);
256 #else
257 errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
258 && errno != ENOBUFS);
259 #endif
260 return retired_fd;
261 }
262 #endif
263
264 // Return the newly connected socket.
265 const fd_t result = _s;
266 _s = retired_fd;
267 return result;
268 }
269
tune_socket(const fd_t fd_)270 bool zmq::ws_connecter_t::tune_socket (const fd_t fd_)
271 {
272 const int rc =
273 tune_tcp_socket (fd_) | tune_tcp_maxrt (fd_, options.tcp_maxrt);
274 return rc == 0;
275 }
276
create_engine(fd_t fd_,const std::string & local_address_)277 void zmq::ws_connecter_t::create_engine (fd_t fd_,
278 const std::string &local_address_)
279 {
280 const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint,
281 endpoint_type_connect);
282
283 // Create the engine object for this connection.
284 i_engine *engine = NULL;
285 if (_wss) {
286 #ifdef ZMQ_HAVE_WSS
287 engine = new (std::nothrow)
288 wss_engine_t (fd_, options, endpoint_pair, *_addr->resolved.ws_addr,
289 true, NULL, _hostname);
290 #else
291 LIBZMQ_UNUSED (_hostname);
292 assert (false);
293 #endif
294 } else
295 engine = new (std::nothrow) ws_engine_t (
296 fd_, options, endpoint_pair, *_addr->resolved.ws_addr, true);
297 alloc_assert (engine);
298
299 // Attach the engine to the corresponding session object.
300 send_attach (_session, engine);
301
302 // Shut the connecter down.
303 terminate ();
304
305 _socket->event_connected (endpoint_pair, fd_);
306 }
307