1 /*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "precompiled.hpp"
31 #include <new>
32 #include <string>
33
34 #include "macros.hpp"
35 #include "tcp_connecter.hpp"
36 #include "io_thread.hpp"
37 #include "err.hpp"
38 #include "ip.hpp"
39 #include "tcp.hpp"
40 #include "address.hpp"
41 #include "tcp_address.hpp"
42 #include "session_base.hpp"
43
44 #if !defined ZMQ_HAVE_WINDOWS
45 #include <unistd.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #include <arpa/inet.h>
49 #include <netinet/tcp.h>
50 #include <netinet/in.h>
51 #include <netdb.h>
52 #include <fcntl.h>
53 #ifdef ZMQ_HAVE_VXWORKS
54 #include <sockLib.h>
55 #endif
56 #ifdef ZMQ_HAVE_OPENVMS
57 #include <ioctl.h>
58 #endif
59 #endif
60
61 #ifdef __APPLE__
62 #include <TargetConditionals.h>
63 #endif
64
tcp_connecter_t(class io_thread_t * io_thread_,class session_base_t * session_,const options_t & options_,address_t * addr_,bool delayed_start_)65 zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
66 class session_base_t *session_,
67 const options_t &options_,
68 address_t *addr_,
69 bool delayed_start_) :
70 stream_connecter_base_t (
71 io_thread_, session_, options_, addr_, delayed_start_),
72 _connect_timer_started (false)
73 {
74 zmq_assert (_addr->protocol == protocol_name::tcp);
75 }
76
~tcp_connecter_t()77 zmq::tcp_connecter_t::~tcp_connecter_t ()
78 {
79 zmq_assert (!_connect_timer_started);
80 }
81
process_term(int linger_)82 void zmq::tcp_connecter_t::process_term (int linger_)
83 {
84 if (_connect_timer_started) {
85 cancel_timer (connect_timer_id);
86 _connect_timer_started = false;
87 }
88
89 stream_connecter_base_t::process_term (linger_);
90 }
91
out_event()92 void zmq::tcp_connecter_t::out_event ()
93 {
94 if (_connect_timer_started) {
95 cancel_timer (connect_timer_id);
96 _connect_timer_started = false;
97 }
98
99 // TODO this is still very similar to (t)ipc_connecter_t, maybe the
100 // differences can be factored out
101
102 rm_handle ();
103
104 const fd_t fd = connect ();
105
106 if (fd == retired_fd
107 && ((options.reconnect_stop & ZMQ_RECONNECT_STOP_CONN_REFUSED)
108 && errno == ECONNREFUSED)) {
109 send_conn_failed (_session);
110 close ();
111 terminate ();
112 return;
113 }
114
115 // Handle the error condition by attempt to reconnect.
116 if (fd == retired_fd || !tune_socket (fd)) {
117 close ();
118 add_reconnect_timer ();
119 return;
120 }
121
122 create_engine (fd, get_socket_name<tcp_address_t> (fd, socket_end_local));
123 }
124
timer_event(int id_)125 void zmq::tcp_connecter_t::timer_event (int id_)
126 {
127 if (id_ == connect_timer_id) {
128 _connect_timer_started = false;
129 rm_handle ();
130 close ();
131 add_reconnect_timer ();
132 } else
133 stream_connecter_base_t::timer_event (id_);
134 }
135
start_connecting()136 void zmq::tcp_connecter_t::start_connecting ()
137 {
138 // Open the connecting socket.
139 const int rc = open ();
140
141 // Connect may succeed in synchronous manner.
142 if (rc == 0) {
143 _handle = add_fd (_s);
144 out_event ();
145 }
146
147 // Connection establishment may be delayed. Poll for its completion.
148 else if (rc == -1 && errno == EINPROGRESS) {
149 _handle = add_fd (_s);
150 set_pollout (_handle);
151 _socket->event_connect_delayed (
152 make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
153
154 // add userspace connect timeout
155 add_connect_timer ();
156 }
157
158 // Handle any other error condition by eventual reconnect.
159 else {
160 if (_s != retired_fd)
161 close ();
162 add_reconnect_timer ();
163 }
164 }
165
add_connect_timer()166 void zmq::tcp_connecter_t::add_connect_timer ()
167 {
168 if (options.connect_timeout > 0) {
169 add_timer (options.connect_timeout, connect_timer_id);
170 _connect_timer_started = true;
171 }
172 }
173
open()174 int zmq::tcp_connecter_t::open ()
175 {
176 zmq_assert (_s == retired_fd);
177
178 // Resolve the address
179 if (_addr->resolved.tcp_addr != NULL) {
180 LIBZMQ_DELETE (_addr->resolved.tcp_addr);
181 }
182
183 _addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
184 alloc_assert (_addr->resolved.tcp_addr);
185 _s = tcp_open_socket (_addr->address.c_str (), options, false, true,
186 _addr->resolved.tcp_addr);
187 if (_s == retired_fd) {
188 // TODO we should emit some event in this case!
189
190 LIBZMQ_DELETE (_addr->resolved.tcp_addr);
191 return -1;
192 }
193 zmq_assert (_addr->resolved.tcp_addr != NULL);
194
195 // Set the socket to non-blocking mode so that we get async connect().
196 unblock_socket (_s);
197
198 const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr;
199
200 int rc;
201
202 // Set a source address for conversations
203 if (tcp_addr->has_src_addr ()) {
204 // Allow reusing of the address, to connect to different servers
205 // using the same source port on the client.
206 int flag = 1;
207 #ifdef ZMQ_HAVE_WINDOWS
208 rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR,
209 reinterpret_cast<const char *> (&flag), sizeof (int));
210 wsa_assert (rc != SOCKET_ERROR);
211 #elif defined ZMQ_HAVE_VXWORKS
212 rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag,
213 sizeof (int));
214 errno_assert (rc == 0);
215 #else
216 rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
217 errno_assert (rc == 0);
218 #endif
219
220 #if defined ZMQ_HAVE_VXWORKS
221 rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (),
222 tcp_addr->src_addrlen ());
223 #else
224 rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
225 #endif
226 if (rc == -1)
227 return -1;
228 }
229
230 // Connect to the remote peer.
231 #if defined ZMQ_HAVE_VXWORKS
232 rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
233 #else
234 rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ());
235 #endif
236 // Connect was successful immediately.
237 if (rc == 0) {
238 return 0;
239 }
240
241 // Translate error codes indicating asynchronous connect has been
242 // launched to a uniform EINPROGRESS.
243 #ifdef ZMQ_HAVE_WINDOWS
244 const int last_error = WSAGetLastError ();
245 if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
246 errno = EINPROGRESS;
247 else
248 errno = wsa_error_to_errno (last_error);
249 #else
250 if (errno == EINTR)
251 errno = EINPROGRESS;
252 #endif
253 return -1;
254 }
255
connect()256 zmq::fd_t zmq::tcp_connecter_t::connect ()
257 {
258 // Async connect has finished. Check whether an error occurred
259 int err = 0;
260 #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
261 int len = sizeof err;
262 #else
263 socklen_t len = sizeof err;
264 #endif
265
266 const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
267 reinterpret_cast<char *> (&err), &len);
268
269 // Assert if the error was caused by 0MQ bug.
270 // Networking problems are OK. No need to assert.
271 #ifdef ZMQ_HAVE_WINDOWS
272 zmq_assert (rc == 0);
273 if (err != 0) {
274 if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
275 || err == WSAENOBUFS) {
276 wsa_assert_no (err);
277 }
278 errno = wsa_error_to_errno (err);
279 return retired_fd;
280 }
281 #else
282 // Following code should handle both Berkeley-derived socket
283 // implementations and Solaris.
284 if (rc == -1)
285 err = errno;
286 if (err != 0) {
287 errno = err;
288 #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
289 errno_assert (errno != EBADF && errno != ENOPROTOOPT
290 && errno != ENOTSOCK && errno != ENOBUFS);
291 #else
292 errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
293 && errno != ENOBUFS);
294 #endif
295 return retired_fd;
296 }
297 #endif
298
299 // Return the newly connected socket.
300 const fd_t result = _s;
301 _s = retired_fd;
302 return result;
303 }
304
tune_socket(const fd_t fd_)305 bool zmq::tcp_connecter_t::tune_socket (const fd_t fd_)
306 {
307 const int rc = tune_tcp_socket (fd_)
308 | tune_tcp_keepalives (
309 fd_, options.tcp_keepalive, options.tcp_keepalive_cnt,
310 options.tcp_keepalive_idle, options.tcp_keepalive_intvl)
311 | tune_tcp_maxrt (fd_, options.tcp_maxrt);
312 return rc == 0;
313 }
314