1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "poller.hpp"
32 #include "polling_util.hpp"
33 
34 #if defined ZMQ_POLL_BASED_ON_POLL
35 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_AIX
36 #include <poll.h>
37 #endif
38 #elif defined ZMQ_POLL_BASED_ON_SELECT
39 #if defined ZMQ_HAVE_WINDOWS
40 #elif defined ZMQ_HAVE_HPUX
41 #include <sys/param.h>
42 #include <sys/types.h>
43 #include <sys/time.h>
44 #elif defined ZMQ_HAVE_OPENVMS
45 #include <sys/types.h>
46 #include <sys/time.h>
47 #elif defined ZMQ_HAVE_VXWORKS
48 #include <sys/types.h>
49 #include <sys/time.h>
50 #include <sockLib.h>
51 #include <strings.h>
52 #else
53 #include <sys/select.h>
54 #endif
55 #endif
56 
57 #include "signaler.hpp"
58 #include "likely.hpp"
59 #include "stdint.hpp"
60 #include "config.hpp"
61 #include "err.hpp"
62 #include "fd.hpp"
63 #include "ip.hpp"
64 #include "tcp.hpp"
65 
66 #if !defined ZMQ_HAVE_WINDOWS
67 #include <unistd.h>
68 #include <netinet/tcp.h>
69 #include <sys/types.h>
70 #include <sys/socket.h>
71 #endif
72 
73 #if !defined(ZMQ_HAVE_WINDOWS)
74 // Helper to sleep for specific number of milliseconds (or until signal)
75 //
sleep_ms(unsigned int ms_)76 static int sleep_ms (unsigned int ms_)
77 {
78     if (ms_ == 0)
79         return 0;
80 #if defined ZMQ_HAVE_ANDROID
81     usleep (ms_ * 1000);
82     return 0;
83 #elif defined ZMQ_HAVE_VXWORKS
84     struct timespec ns_;
85     ns_.tv_sec = ms_ / 1000;
86     ns_.tv_nsec = ms_ % 1000 * 1000000;
87     return nanosleep (&ns_, 0);
88 #else
89     return usleep (ms_ * 1000);
90 #endif
91 }
92 
93 // Helper to wait on close(), for non-blocking sockets, until it completes
94 // If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
95 // the overall timeout is reached.
96 //
close_wait_ms(int fd_,unsigned int max_ms_=2000)97 static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
98 {
99     unsigned int ms_so_far = 0;
100     const unsigned int min_step_ms = 1;
101     const unsigned int max_step_ms = 100;
102     const unsigned int step_ms =
103       std::min (std::max (min_step_ms, max_ms_ / 10), max_step_ms);
104 
105     int rc = 0; // do not sleep on first attempt
106     do {
107         if (rc == -1 && errno == EAGAIN) {
108             sleep_ms (step_ms);
109             ms_so_far += step_ms;
110         }
111         rc = close (fd_);
112     } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);
113 
114     return rc;
115 }
116 #endif
117 
signaler_t()118 zmq::signaler_t::signaler_t ()
119 {
120     //  Create the socketpair for signaling.
121     if (make_fdpair (&_r, &_w) == 0) {
122         unblock_socket (_w);
123         unblock_socket (_r);
124     }
125 #ifdef HAVE_FORK
126     pid = getpid ();
127 #endif
128 }
129 
130 // This might get run after some part of construction failed, leaving one or
131 // both of _r and _w retired_fd.
~signaler_t()132 zmq::signaler_t::~signaler_t ()
133 {
134 #if defined ZMQ_HAVE_EVENTFD
135     if (_r == retired_fd)
136         return;
137     int rc = close_wait_ms (_r);
138     errno_assert (rc == 0);
139 #elif defined ZMQ_HAVE_WINDOWS
140     if (_w != retired_fd) {
141         const struct linger so_linger = {1, 0};
142         int rc = setsockopt (_w, SOL_SOCKET, SO_LINGER,
143                              reinterpret_cast<const char *> (&so_linger),
144                              sizeof so_linger);
145         //  Only check shutdown if WSASTARTUP was previously done
146         if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
147             wsa_assert (rc != SOCKET_ERROR);
148             rc = closesocket (_w);
149             wsa_assert (rc != SOCKET_ERROR);
150             if (_r == retired_fd)
151                 return;
152             rc = closesocket (_r);
153             wsa_assert (rc != SOCKET_ERROR);
154         }
155     }
156 #else
157     if (_w != retired_fd) {
158         int rc = close_wait_ms (_w);
159         errno_assert (rc == 0);
160     }
161     if (_r != retired_fd) {
162         int rc = close_wait_ms (_r);
163         errno_assert (rc == 0);
164     }
165 #endif
166 }
167 
get_fd() const168 zmq::fd_t zmq::signaler_t::get_fd () const
169 {
170     return _r;
171 }
172 
send()173 void zmq::signaler_t::send ()
174 {
175 #if defined HAVE_FORK
176     if (unlikely (pid != getpid ())) {
177         //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
178         return; // do not send anything in forked child context
179     }
180 #endif
181 #if defined ZMQ_HAVE_EVENTFD
182     const uint64_t inc = 1;
183     ssize_t sz = write (_w, &inc, sizeof (inc));
184     errno_assert (sz == sizeof (inc));
185 #elif defined ZMQ_HAVE_WINDOWS
186     const char dummy = 0;
187     int nbytes;
188     do {
189         nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
190         wsa_assert (nbytes != SOCKET_ERROR);
191         // wsa_assert does not abort on WSAEWOULDBLOCK. If we get this, we retry.
192     } while (nbytes == SOCKET_ERROR);
193     // Given the small size of dummy (should be 1) expect that send was able to send everything.
194     zmq_assert (nbytes == sizeof (dummy));
195 #elif defined ZMQ_HAVE_VXWORKS
196     unsigned char dummy = 0;
197     while (true) {
198         ssize_t nbytes = ::send (_w, (char *) &dummy, sizeof (dummy), 0);
199         if (unlikely (nbytes == -1 && errno == EINTR))
200             continue;
201 #if defined(HAVE_FORK)
202         if (unlikely (pid != getpid ())) {
203             //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
204             errno = EINTR;
205             break;
206         }
207 #endif
208         zmq_assert (nbytes == sizeof dummy);
209         break;
210     }
211 #else
212     unsigned char dummy = 0;
213     while (true) {
214         ssize_t nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
215         if (unlikely (nbytes == -1 && errno == EINTR))
216             continue;
217 #if defined(HAVE_FORK)
218         if (unlikely (pid != getpid ())) {
219             //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
220             errno = EINTR;
221             break;
222         }
223 #endif
224         zmq_assert (nbytes == sizeof dummy);
225         break;
226     }
227 #endif
228 }
229 
wait(int timeout_) const230 int zmq::signaler_t::wait (int timeout_) const
231 {
232 #ifdef HAVE_FORK
233     if (unlikely (pid != getpid ())) {
234         // we have forked and the file descriptor is closed. Emulate an interrupt
235         // response.
236         //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
237         errno = EINTR;
238         return -1;
239     }
240 #endif
241 
242 #ifdef ZMQ_POLL_BASED_ON_POLL
243     struct pollfd pfd;
244     pfd.fd = _r;
245     pfd.events = POLLIN;
246     const int rc = poll (&pfd, 1, timeout_);
247     if (unlikely (rc < 0)) {
248         errno_assert (errno == EINTR);
249         return -1;
250     }
251     if (unlikely (rc == 0)) {
252         errno = EAGAIN;
253         return -1;
254     }
255 #ifdef HAVE_FORK
256     if (unlikely (pid != getpid ())) {
257         // we have forked and the file descriptor is closed. Emulate an interrupt
258         // response.
259         //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
260         errno = EINTR;
261         return -1;
262     }
263 #endif
264     zmq_assert (rc == 1);
265     zmq_assert (pfd.revents & POLLIN);
266     return 0;
267 
268 #elif defined ZMQ_POLL_BASED_ON_SELECT
269 
270     optimized_fd_set_t fds (1);
271     FD_ZERO (fds.get ());
272     FD_SET (_r, fds.get ());
273     struct timeval timeout;
274     if (timeout_ >= 0) {
275         timeout.tv_sec = timeout_ / 1000;
276         timeout.tv_usec = timeout_ % 1000 * 1000;
277     }
278 #ifdef ZMQ_HAVE_WINDOWS
279     int rc =
280       select (0, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
281     wsa_assert (rc != SOCKET_ERROR);
282 #else
283     int rc =
284       select (_r + 1, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
285     if (unlikely (rc < 0)) {
286         errno_assert (errno == EINTR);
287         return -1;
288     }
289 #endif
290     if (unlikely (rc == 0)) {
291         errno = EAGAIN;
292         return -1;
293     }
294     zmq_assert (rc == 1);
295     return 0;
296 
297 #else
298 #error
299 #endif
300 }
301 
recv()302 void zmq::signaler_t::recv ()
303 {
304 //  Attempt to read a signal.
305 #if defined ZMQ_HAVE_EVENTFD
306     uint64_t dummy;
307     ssize_t sz = read (_r, &dummy, sizeof (dummy));
308     errno_assert (sz == sizeof (dummy));
309 
310     //  If we accidentally grabbed the next signal(s) along with the current
311     //  one, return it back to the eventfd object.
312     if (unlikely (dummy > 1)) {
313         const uint64_t inc = dummy - 1;
314         ssize_t sz2 = write (_w, &inc, sizeof (inc));
315         errno_assert (sz2 == sizeof (inc));
316         return;
317     }
318 
319     zmq_assert (dummy == 1);
320 #else
321     unsigned char dummy;
322 #if defined ZMQ_HAVE_WINDOWS
323     const int nbytes =
324       ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
325     wsa_assert (nbytes != SOCKET_ERROR);
326 #elif defined ZMQ_HAVE_VXWORKS
327     ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
328     errno_assert (nbytes >= 0);
329 #else
330     ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
331     errno_assert (nbytes >= 0);
332 #endif
333     zmq_assert (nbytes == sizeof (dummy));
334     zmq_assert (dummy == 0);
335 #endif
336 }
337 
recv_failable()338 int zmq::signaler_t::recv_failable ()
339 {
340 //  Attempt to read a signal.
341 #if defined ZMQ_HAVE_EVENTFD
342     uint64_t dummy;
343     ssize_t sz = read (_r, &dummy, sizeof (dummy));
344     if (sz == -1) {
345         errno_assert (errno == EAGAIN);
346         return -1;
347     }
348     errno_assert (sz == sizeof (dummy));
349 
350     //  If we accidentally grabbed the next signal(s) along with the current
351     //  one, return it back to the eventfd object.
352     if (unlikely (dummy > 1)) {
353         const uint64_t inc = dummy - 1;
354         ssize_t sz2 = write (_w, &inc, sizeof (inc));
355         errno_assert (sz2 == sizeof (inc));
356         return 0;
357     }
358 
359     zmq_assert (dummy == 1);
360 
361 #else
362     unsigned char dummy;
363 #if defined ZMQ_HAVE_WINDOWS
364     const int nbytes =
365       ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
366     if (nbytes == SOCKET_ERROR) {
367         const int last_error = WSAGetLastError ();
368         if (last_error == WSAEWOULDBLOCK) {
369             errno = EAGAIN;
370             return -1;
371         }
372         wsa_assert (last_error == WSAEWOULDBLOCK);
373     }
374 #elif defined ZMQ_HAVE_VXWORKS
375     ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
376     if (nbytes == -1) {
377         if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
378             errno = EAGAIN;
379             return -1;
380         }
381         errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
382                       || errno == EINTR);
383     }
384 #else
385     ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
386     if (nbytes == -1) {
387         if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
388             errno = EAGAIN;
389             return -1;
390         }
391         errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
392                       || errno == EINTR);
393     }
394 #endif
395     zmq_assert (nbytes == sizeof (dummy));
396     zmq_assert (dummy == 0);
397 #endif
398     return 0;
399 }
400 
valid() const401 bool zmq::signaler_t::valid () const
402 {
403     return _w != retired_fd;
404 }
405 
406 #ifdef HAVE_FORK
forked()407 void zmq::signaler_t::forked ()
408 {
409     //  Close file descriptors created in the parent and create new pair
410     close (_r);
411     close (_w);
412     make_fdpair (&_r, &_w);
413 }
414 #endif
415