1 /*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "precompiled.hpp"
31
32 #include "vmci_listener.hpp"
33
34 #if defined ZMQ_HAVE_VMCI
35
36 #include <new>
37
38 #include "stream_engine.hpp"
39 #include "vmci_address.hpp"
40 #include "io_thread.hpp"
41 #include "session_base.hpp"
42 #include "config.hpp"
43 #include "err.hpp"
44 #include "ip.hpp"
45 #include "socket_base.hpp"
46 #include "vmci.hpp"
47
48 #if defined ZMQ_HAVE_WINDOWS
49 #include "windows.hpp"
50 #else
51 #include <unistd.h>
52 #include <fcntl.h>
53 #endif
54
vmci_listener_t(io_thread_t * io_thread_,socket_base_t * socket_,const options_t & options_)55 zmq::vmci_listener_t::vmci_listener_t (io_thread_t *io_thread_,
56 socket_base_t *socket_,
57 const options_t &options_) :
58 own_t (io_thread_, options_),
59 io_object_t (io_thread_),
60 s (retired_fd),
61 socket (socket_)
62 {
63 }
64
~vmci_listener_t()65 zmq::vmci_listener_t::~vmci_listener_t ()
66 {
67 zmq_assert (s == retired_fd);
68 }
69
process_plug()70 void zmq::vmci_listener_t::process_plug ()
71 {
72 // Start polling for incoming connections.
73 handle = add_fd (s);
74 set_pollin (handle);
75 }
76
process_term(int linger_)77 void zmq::vmci_listener_t::process_term (int linger_)
78 {
79 rm_fd (handle);
80 close ();
81 own_t::process_term (linger_);
82 }
83
in_event()84 void zmq::vmci_listener_t::in_event ()
85 {
86 fd_t fd = accept ();
87
88 // If connection was reset by the peer in the meantime, just ignore it.
89 if (fd == retired_fd) {
90 socket->event_accept_failed (
91 make_unconnected_bind_endpoint_pair (endpoint), zmq_errno ());
92 return;
93 }
94
95 tune_vmci_buffer_size (this->get_ctx (), fd, options.vmci_buffer_size,
96 options.vmci_buffer_min_size,
97 options.vmci_buffer_max_size);
98
99 if (options.vmci_connect_timeout > 0) {
100 #if defined ZMQ_HAVE_WINDOWS
101 tune_vmci_connect_timeout (this->get_ctx (), fd,
102 options.vmci_connect_timeout);
103 #else
104 struct timeval timeout = {0, options.vmci_connect_timeout * 1000};
105 tune_vmci_connect_timeout (this->get_ctx (), fd, timeout);
106 #endif
107 }
108
109 // Create the engine object for this connection.
110 stream_engine_t *engine = new (std::nothrow) stream_engine_t (
111 fd, options, make_unconnected_bind_endpoint_pair (endpoint));
112 alloc_assert (engine);
113
114 // Choose I/O thread to run connecter in. Given that we are already
115 // running in an I/O thread, there must be at least one available.
116 io_thread_t *io_thread = choose_io_thread (options.affinity);
117 zmq_assert (io_thread);
118
119 // Create and launch a session object.
120 session_base_t *session =
121 session_base_t::create (io_thread, false, socket, options, NULL);
122 errno_assert (session);
123 session->inc_seqnum ();
124 launch_child (session);
125 send_attach (session, engine, false);
126 socket->event_accepted (make_unconnected_bind_endpoint_pair (endpoint), fd);
127 }
128
get_local_address(std::string & addr_)129 int zmq::vmci_listener_t::get_local_address (std::string &addr_)
130 {
131 struct sockaddr_storage ss;
132 #ifdef ZMQ_HAVE_HPUX
133 int sl = sizeof (ss);
134 #else
135 socklen_t sl = sizeof (ss);
136 #endif
137 int rc = getsockname (s, (sockaddr *) &ss, &sl);
138 if (rc != 0) {
139 addr_.clear ();
140 return rc;
141 }
142
143 vmci_address_t addr ((struct sockaddr *) &ss, sl, this->get_ctx ());
144 return addr.to_string (addr_);
145 }
146
set_local_address(const char * addr_)147 int zmq::vmci_listener_t::set_local_address (const char *addr_)
148 {
149 // Create addr on stack for auto-cleanup
150 std::string addr (addr_);
151
152 // Initialise the address structure.
153 vmci_address_t address (this->get_ctx ());
154 int rc = address.resolve (addr.c_str ());
155 if (rc != 0)
156 return -1;
157
158 // Create a listening socket.
159 s =
160 open_socket (this->get_ctx ()->get_vmci_socket_family (), SOCK_STREAM, 0);
161 #ifdef ZMQ_HAVE_WINDOWS
162 if (s == INVALID_SOCKET) {
163 errno = wsa_error_to_errno (WSAGetLastError ());
164 return -1;
165 }
166 #if !defined _WIN32_WCE
167 // On Windows, preventing sockets to be inherited by child processes.
168 BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
169 win_assert (brc);
170 #endif
171 #else
172 if (s == -1)
173 return -1;
174 #endif
175
176 address.to_string (endpoint);
177
178 // Bind the socket.
179 rc = bind (s, address.addr (), address.addrlen ());
180 #ifdef ZMQ_HAVE_WINDOWS
181 if (rc == SOCKET_ERROR) {
182 errno = wsa_error_to_errno (WSAGetLastError ());
183 goto error;
184 }
185 #else
186 if (rc != 0)
187 goto error;
188 #endif
189
190 // Listen for incoming connections.
191 rc = listen (s, options.backlog);
192 #ifdef ZMQ_HAVE_WINDOWS
193 if (rc == SOCKET_ERROR) {
194 errno = wsa_error_to_errno (WSAGetLastError ());
195 goto error;
196 }
197 #else
198 if (rc != 0)
199 goto error;
200 #endif
201
202 socket->event_listening (make_unconnected_bind_endpoint_pair (endpoint), s);
203 return 0;
204
205 error:
206 int err = errno;
207 close ();
208 errno = err;
209 return -1;
210 }
211
close()212 void zmq::vmci_listener_t::close ()
213 {
214 zmq_assert (s != retired_fd);
215 #ifdef ZMQ_HAVE_WINDOWS
216 int rc = closesocket (s);
217 wsa_assert (rc != SOCKET_ERROR);
218 #else
219 int rc = ::close (s);
220 errno_assert (rc == 0);
221 #endif
222 socket->event_closed (make_unconnected_bind_endpoint_pair (endpoint), s);
223 s = retired_fd;
224 }
225
accept()226 zmq::fd_t zmq::vmci_listener_t::accept ()
227 {
228 // Accept one connection and deal with different failure modes.
229 // The situation where connection cannot be accepted due to insufficient
230 // resources is considered valid and treated by ignoring the connection.
231 zmq_assert (s != retired_fd);
232 fd_t sock = ::accept (s, NULL, NULL);
233
234 #ifdef ZMQ_HAVE_WINDOWS
235 if (sock == INVALID_SOCKET) {
236 wsa_assert (WSAGetLastError () == WSAEWOULDBLOCK
237 || WSAGetLastError () == WSAECONNRESET
238 || WSAGetLastError () == WSAEMFILE
239 || WSAGetLastError () == WSAENOBUFS);
240 return retired_fd;
241 }
242 #if !defined _WIN32_WCE
243 // On Windows, preventing sockets to be inherited by child processes.
244 BOOL brc = SetHandleInformation ((HANDLE) sock, HANDLE_FLAG_INHERIT, 0);
245 win_assert (brc);
246 #endif
247 #else
248 if (sock == -1) {
249 errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR
250 || errno == ECONNABORTED || errno == EPROTO
251 || errno == ENOBUFS || errno == ENOMEM || errno == EMFILE
252 || errno == ENFILE);
253 return retired_fd;
254 }
255 #endif
256
257 // Race condition can cause socket not to be closed (if fork happens
258 // between accept and this point).
259 #ifdef FD_CLOEXEC
260 int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
261 errno_assert (rc != -1);
262 #endif
263
264 return sock;
265 }
266
267 #endif
268