1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "stream_connecter_base.hpp"
32 #include "session_base.hpp"
33 #include "address.hpp"
34 #include "random.hpp"
35 #include "zmtp_engine.hpp"
36 #include "raw_engine.hpp"
37 
38 #ifndef ZMQ_HAVE_WINDOWS
39 #include <unistd.h>
40 #else
41 #include <winsock2.h>
42 #endif
43 
44 #include <limits>
45 
stream_connecter_base_t(zmq::io_thread_t * io_thread_,zmq::session_base_t * session_,const zmq::options_t & options_,zmq::address_t * addr_,bool delayed_start_)46 zmq::stream_connecter_base_t::stream_connecter_base_t (
47   zmq::io_thread_t *io_thread_,
48   zmq::session_base_t *session_,
49   const zmq::options_t &options_,
50   zmq::address_t *addr_,
51   bool delayed_start_) :
52     own_t (io_thread_, options_),
53     io_object_t (io_thread_),
54     _addr (addr_),
55     _s (retired_fd),
56     _handle (static_cast<handle_t> (NULL)),
57     _socket (session_->get_socket ()),
58     _delayed_start (delayed_start_),
59     _reconnect_timer_started (false),
60     _current_reconnect_ivl (options.reconnect_ivl),
61     _session (session_)
62 {
63     zmq_assert (_addr);
64     _addr->to_string (_endpoint);
65     // TODO the return value is unused! what if it fails? if this is impossible
66     // or does not matter, change such that endpoint in initialized using an
67     // initializer, and make endpoint const
68 }
69 
~stream_connecter_base_t()70 zmq::stream_connecter_base_t::~stream_connecter_base_t ()
71 {
72     zmq_assert (!_reconnect_timer_started);
73     zmq_assert (!_handle);
74     zmq_assert (_s == retired_fd);
75 }
76 
process_plug()77 void zmq::stream_connecter_base_t::process_plug ()
78 {
79     if (_delayed_start)
80         add_reconnect_timer ();
81     else
82         start_connecting ();
83 }
84 
process_term(int linger_)85 void zmq::stream_connecter_base_t::process_term (int linger_)
86 {
87     if (_reconnect_timer_started) {
88         cancel_timer (reconnect_timer_id);
89         _reconnect_timer_started = false;
90     }
91 
92     if (_handle) {
93         rm_handle ();
94     }
95 
96     if (_s != retired_fd)
97         close ();
98 
99     own_t::process_term (linger_);
100 }
101 
add_reconnect_timer()102 void zmq::stream_connecter_base_t::add_reconnect_timer ()
103 {
104     if (options.reconnect_ivl > 0) {
105         const int interval = get_new_reconnect_ivl ();
106         add_timer (interval, reconnect_timer_id);
107         _socket->event_connect_retried (
108           make_unconnected_connect_endpoint_pair (_endpoint), interval);
109         _reconnect_timer_started = true;
110     }
111 }
112 
get_new_reconnect_ivl()113 int zmq::stream_connecter_base_t::get_new_reconnect_ivl ()
114 {
115     //  TODO should the random jitter be really based on the configured initial
116     //  reconnect interval options.reconnect_ivl, or better on the
117     //  _current_reconnect_ivl?
118 
119     //  The new interval is the current interval + random value.
120     const int random_jitter = generate_random () % options.reconnect_ivl;
121     const int interval =
122       _current_reconnect_ivl < std::numeric_limits<int>::max () - random_jitter
123         ? _current_reconnect_ivl + random_jitter
124         : std::numeric_limits<int>::max ();
125 
126     //  Only change the new current reconnect interval if the maximum reconnect
127     //  interval was set and if it's larger than the reconnect interval.
128     if (options.reconnect_ivl_max > 0
129         && options.reconnect_ivl_max > options.reconnect_ivl) {
130         //  Calculate the next interval
131         _current_reconnect_ivl =
132           _current_reconnect_ivl < std::numeric_limits<int>::max () / 2
133             ? std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max)
134             : options.reconnect_ivl_max;
135     }
136 
137     return interval;
138 }
139 
rm_handle()140 void zmq::stream_connecter_base_t::rm_handle ()
141 {
142     rm_fd (_handle);
143     _handle = static_cast<handle_t> (NULL);
144 }
145 
close()146 void zmq::stream_connecter_base_t::close ()
147 {
148     // TODO before, this was an assertion for _s != retired_fd, but this does not match usage of close
149     if (_s != retired_fd) {
150 #ifdef ZMQ_HAVE_WINDOWS
151         const int rc = closesocket (_s);
152         wsa_assert (rc != SOCKET_ERROR);
153 #else
154         const int rc = ::close (_s);
155         errno_assert (rc == 0);
156 #endif
157         _socket->event_closed (
158           make_unconnected_connect_endpoint_pair (_endpoint), _s);
159         _s = retired_fd;
160     }
161 }
162 
in_event()163 void zmq::stream_connecter_base_t::in_event ()
164 {
165     //  We are not polling for incoming data, so we are actually called
166     //  because of error here. However, we can get error on out event as well
167     //  on some platforms, so we'll simply handle both events in the same way.
168     out_event ();
169 }
170 
create_engine(fd_t fd_,const std::string & local_address_)171 void zmq::stream_connecter_base_t::create_engine (
172   fd_t fd_, const std::string &local_address_)
173 {
174     const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint,
175                                              endpoint_type_connect);
176 
177     //  Create the engine object for this connection.
178     i_engine *engine;
179     if (options.raw_socket)
180         engine = new (std::nothrow) raw_engine_t (fd_, options, endpoint_pair);
181     else
182         engine = new (std::nothrow) zmtp_engine_t (fd_, options, endpoint_pair);
183     alloc_assert (engine);
184 
185     //  Attach the engine to the corresponding session object.
186     send_attach (_session, engine);
187 
188     //  Shut the connecter down.
189     terminate ();
190 
191     _socket->event_connected (endpoint_pair, fd_);
192 }
193 
timer_event(int id_)194 void zmq::stream_connecter_base_t::timer_event (int id_)
195 {
196     zmq_assert (id_ == reconnect_timer_id);
197     _reconnect_timer_started = false;
198     start_connecting ();
199 }
200