1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <new>
32 #include <string>
33 
34 #include "macros.hpp"
35 #include "tcp_connecter.hpp"
36 #include "io_thread.hpp"
37 #include "err.hpp"
38 #include "ip.hpp"
39 #include "tcp.hpp"
40 #include "address.hpp"
41 #include "tcp_address.hpp"
42 #include "session_base.hpp"
43 
44 #if !defined ZMQ_HAVE_WINDOWS
45 #include <unistd.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #include <arpa/inet.h>
49 #include <netinet/tcp.h>
50 #include <netinet/in.h>
51 #include <netdb.h>
52 #include <fcntl.h>
53 #ifdef ZMQ_HAVE_VXWORKS
54 #include <sockLib.h>
55 #endif
56 #ifdef ZMQ_HAVE_OPENVMS
57 #include <ioctl.h>
58 #endif
59 #endif
60 
61 #ifdef __APPLE__
62 #include <TargetConditionals.h>
63 #endif
64 
tcp_connecter_t(class io_thread_t * io_thread_,class session_base_t * session_,const options_t & options_,address_t * addr_,bool delayed_start_)65 zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
66                                        class session_base_t *session_,
67                                        const options_t &options_,
68                                        address_t *addr_,
69                                        bool delayed_start_) :
70     stream_connecter_base_t (
71       io_thread_, session_, options_, addr_, delayed_start_),
72     _connect_timer_started (false)
73 {
74     zmq_assert (_addr->protocol == protocol_name::tcp);
75 }
76 
~tcp_connecter_t()77 zmq::tcp_connecter_t::~tcp_connecter_t ()
78 {
79     zmq_assert (!_connect_timer_started);
80 }
81 
process_term(int linger_)82 void zmq::tcp_connecter_t::process_term (int linger_)
83 {
84     if (_connect_timer_started) {
85         cancel_timer (connect_timer_id);
86         _connect_timer_started = false;
87     }
88 
89     stream_connecter_base_t::process_term (linger_);
90 }
91 
out_event()92 void zmq::tcp_connecter_t::out_event ()
93 {
94     if (_connect_timer_started) {
95         cancel_timer (connect_timer_id);
96         _connect_timer_started = false;
97     }
98 
99     //  TODO this is still very similar to (t)ipc_connecter_t, maybe the
100     //  differences can be factored out
101 
102     rm_handle ();
103 
104     const fd_t fd = connect ();
105 
106     if (fd == retired_fd
107         && ((options.reconnect_stop & ZMQ_RECONNECT_STOP_CONN_REFUSED)
108             && errno == ECONNREFUSED)) {
109         send_conn_failed (_session);
110         close ();
111         terminate ();
112         return;
113     }
114 
115     //  Handle the error condition by attempt to reconnect.
116     if (fd == retired_fd || !tune_socket (fd)) {
117         close ();
118         add_reconnect_timer ();
119         return;
120     }
121 
122     create_engine (fd, get_socket_name<tcp_address_t> (fd, socket_end_local));
123 }
124 
timer_event(int id_)125 void zmq::tcp_connecter_t::timer_event (int id_)
126 {
127     if (id_ == connect_timer_id) {
128         _connect_timer_started = false;
129         rm_handle ();
130         close ();
131         add_reconnect_timer ();
132     } else
133         stream_connecter_base_t::timer_event (id_);
134 }
135 
start_connecting()136 void zmq::tcp_connecter_t::start_connecting ()
137 {
138     //  Open the connecting socket.
139     const int rc = open ();
140 
141     //  Connect may succeed in synchronous manner.
142     if (rc == 0) {
143         _handle = add_fd (_s);
144         out_event ();
145     }
146 
147     //  Connection establishment may be delayed. Poll for its completion.
148     else if (rc == -1 && errno == EINPROGRESS) {
149         _handle = add_fd (_s);
150         set_pollout (_handle);
151         _socket->event_connect_delayed (
152           make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
153 
154         //  add userspace connect timeout
155         add_connect_timer ();
156     }
157 
158     //  Handle any other error condition by eventual reconnect.
159     else {
160         if (_s != retired_fd)
161             close ();
162         add_reconnect_timer ();
163     }
164 }
165 
add_connect_timer()166 void zmq::tcp_connecter_t::add_connect_timer ()
167 {
168     if (options.connect_timeout > 0) {
169         add_timer (options.connect_timeout, connect_timer_id);
170         _connect_timer_started = true;
171     }
172 }
173 
open()174 int zmq::tcp_connecter_t::open ()
175 {
176     zmq_assert (_s == retired_fd);
177 
178     //  Resolve the address
179     if (_addr->resolved.tcp_addr != NULL) {
180         LIBZMQ_DELETE (_addr->resolved.tcp_addr);
181     }
182 
183     _addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
184     alloc_assert (_addr->resolved.tcp_addr);
185     _s = tcp_open_socket (_addr->address.c_str (), options, false, true,
186                           _addr->resolved.tcp_addr);
187     if (_s == retired_fd) {
188         //  TODO we should emit some event in this case!
189 
190         LIBZMQ_DELETE (_addr->resolved.tcp_addr);
191         return -1;
192     }
193     zmq_assert (_addr->resolved.tcp_addr != NULL);
194 
195     // Set the socket to non-blocking mode so that we get async connect().
196     unblock_socket (_s);
197 
198     const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr;
199 
200     int rc;
201 
202     // Set a source address for conversations
203     if (tcp_addr->has_src_addr ()) {
204         //  Allow reusing of the address, to connect to different servers
205         //  using the same source port on the client.
206         int flag = 1;
207 #ifdef ZMQ_HAVE_WINDOWS
208         rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR,
209                          reinterpret_cast<const char *> (&flag), sizeof (int));
210         wsa_assert (rc != SOCKET_ERROR);
211 #elif defined ZMQ_HAVE_VXWORKS
212         rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag,
213                          sizeof (int));
214         errno_assert (rc == 0);
215 #else
216         rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
217         errno_assert (rc == 0);
218 #endif
219 
220 #if defined ZMQ_HAVE_VXWORKS
221         rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (),
222                      tcp_addr->src_addrlen ());
223 #else
224         rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
225 #endif
226         if (rc == -1)
227             return -1;
228     }
229 
230     //  Connect to the remote peer.
231 #if defined ZMQ_HAVE_VXWORKS
232     rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
233 #else
234     rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ());
235 #endif
236     //  Connect was successful immediately.
237     if (rc == 0) {
238         return 0;
239     }
240 
241     //  Translate error codes indicating asynchronous connect has been
242     //  launched to a uniform EINPROGRESS.
243 #ifdef ZMQ_HAVE_WINDOWS
244     const int last_error = WSAGetLastError ();
245     if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
246         errno = EINPROGRESS;
247     else
248         errno = wsa_error_to_errno (last_error);
249 #else
250     if (errno == EINTR)
251         errno = EINPROGRESS;
252 #endif
253     return -1;
254 }
255 
connect()256 zmq::fd_t zmq::tcp_connecter_t::connect ()
257 {
258     //  Async connect has finished. Check whether an error occurred
259     int err = 0;
260 #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
261     int len = sizeof err;
262 #else
263     socklen_t len = sizeof err;
264 #endif
265 
266     const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
267                                reinterpret_cast<char *> (&err), &len);
268 
269     //  Assert if the error was caused by 0MQ bug.
270     //  Networking problems are OK. No need to assert.
271 #ifdef ZMQ_HAVE_WINDOWS
272     zmq_assert (rc == 0);
273     if (err != 0) {
274         if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
275             || err == WSAENOBUFS) {
276             wsa_assert_no (err);
277         }
278         errno = wsa_error_to_errno (err);
279         return retired_fd;
280     }
281 #else
282     //  Following code should handle both Berkeley-derived socket
283     //  implementations and Solaris.
284     if (rc == -1)
285         err = errno;
286     if (err != 0) {
287         errno = err;
288 #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
289         errno_assert (errno != EBADF && errno != ENOPROTOOPT
290                       && errno != ENOTSOCK && errno != ENOBUFS);
291 #else
292         errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
293                       && errno != ENOBUFS);
294 #endif
295         return retired_fd;
296     }
297 #endif
298 
299     //  Return the newly connected socket.
300     const fd_t result = _s;
301     _s = retired_fd;
302     return result;
303 }
304 
tune_socket(const fd_t fd_)305 bool zmq::tcp_connecter_t::tune_socket (const fd_t fd_)
306 {
307     const int rc = tune_tcp_socket (fd_)
308                    | tune_tcp_keepalives (
309                      fd_, options.tcp_keepalive, options.tcp_keepalive_cnt,
310                      options.tcp_keepalive_idle, options.tcp_keepalive_intvl)
311                    | tune_tcp_maxrt (fd_, options.tcp_maxrt);
312     return rc == 0;
313 }
314