1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "select.hpp"
32 #if defined ZMQ_IOTHREAD_POLLER_USE_SELECT
33 
34 #if defined ZMQ_HAVE_WINDOWS
35 #elif defined ZMQ_HAVE_HPUX
36 #include <sys/param.h>
37 #include <sys/types.h>
38 #include <sys/time.h>
39 #elif defined ZMQ_HAVE_OPENVMS
40 #include <sys/types.h>
41 #include <sys/time.h>
42 #elif defined ZMQ_HAVE_VXWORKS
43 #include <sys/types.h>
44 #include <sys/time.h>
45 #include <strings.h>
46 #else
47 #include <sys/select.h>
48 #endif
49 
50 #include "err.hpp"
51 #include "config.hpp"
52 #include "i_poll_events.hpp"
53 
54 #include <algorithm>
55 #include <limits>
56 #include <climits>
57 
select_t(const zmq::thread_ctx_t & ctx_)58 zmq::select_t::select_t (const zmq::thread_ctx_t &ctx_) :
59     worker_poller_base_t (ctx_),
60 #if defined ZMQ_HAVE_WINDOWS
61     //  Fine as long as map is not cleared.
62     _current_family_entry_it (_family_entries.end ())
63 #else
64     _max_fd (retired_fd)
65 #endif
66 {
67 #if defined ZMQ_HAVE_WINDOWS
68     for (size_t i = 0; i < fd_family_cache_size; ++i)
69         _fd_family_cache[i] = std::make_pair (retired_fd, 0);
70 #endif
71 }
72 
~select_t()73 zmq::select_t::~select_t ()
74 {
75     stop_worker ();
76 }
77 
add_fd(fd_t fd_,i_poll_events * events_)78 zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
79 {
80     check_thread ();
81     zmq_assert (fd_ != retired_fd);
82 
83     fd_entry_t fd_entry;
84     fd_entry.fd = fd_;
85     fd_entry.events = events_;
86 
87 #if defined ZMQ_HAVE_WINDOWS
88     u_short family = get_fd_family (fd_);
89     wsa_assert (family != AF_UNSPEC);
90     family_entry_t &family_entry = _family_entries[family];
91 #else
92     family_entry_t &family_entry = _family_entry;
93 #endif
94     family_entry.fd_entries.push_back (fd_entry);
95     FD_SET (fd_, &family_entry.fds_set.error);
96 
97 #if !defined ZMQ_HAVE_WINDOWS
98     if (fd_ > _max_fd)
99         _max_fd = fd_;
100 #endif
101 
102     adjust_load (1);
103 
104     return fd_;
105 }
106 
107 zmq::select_t::fd_entries_t::iterator
find_fd_entry_by_handle(fd_entries_t & fd_entries_,handle_t handle_)108 zmq::select_t::find_fd_entry_by_handle (fd_entries_t &fd_entries_,
109                                         handle_t handle_)
110 {
111     fd_entries_t::iterator fd_entry_it;
112     for (fd_entry_it = fd_entries_.begin (); fd_entry_it != fd_entries_.end ();
113          ++fd_entry_it)
114         if (fd_entry_it->fd == handle_)
115             break;
116 
117     return fd_entry_it;
118 }
119 
trigger_events(const fd_entries_t & fd_entries_,const fds_set_t & local_fds_set_,int event_count_)120 void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_,
121                                     const fds_set_t &local_fds_set_,
122                                     int event_count_)
123 {
124     //  Size is cached to avoid iteration through recently added descriptors.
125     for (fd_entries_t::size_type i = 0, size = fd_entries_.size ();
126          i < size && event_count_ > 0; ++i) {
127         //  fd_entries_[i] may not be stored, since calls to
128         //  in_event/out_event may reallocate the vector
129 
130         if (is_retired_fd (fd_entries_[i]))
131             continue;
132 
133         if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.read)) {
134             fd_entries_[i].events->in_event ();
135             --event_count_;
136         }
137 
138         //  TODO: can the is_retired_fd be true at this point? if it
139         //  was retired before, we would already have continued, and I
140         //  don't see where it might have been modified
141         //  And if rc == 0, we can break instead of continuing
142         if (is_retired_fd (fd_entries_[i]) || event_count_ == 0)
143             continue;
144 
145         if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.write)) {
146             fd_entries_[i].events->out_event ();
147             --event_count_;
148         }
149 
150         //  TODO: same as above
151         if (is_retired_fd (fd_entries_[i]) || event_count_ == 0)
152             continue;
153 
154         if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.error)) {
155             fd_entries_[i].events->in_event ();
156             --event_count_;
157         }
158     }
159 }
160 
161 #if defined ZMQ_HAVE_WINDOWS
try_retire_fd_entry(family_entries_t::iterator family_entry_it_,zmq::fd_t & handle_)162 int zmq::select_t::try_retire_fd_entry (
163   family_entries_t::iterator family_entry_it_, zmq::fd_t &handle_)
164 {
165     family_entry_t &family_entry = family_entry_it_->second;
166 
167     fd_entries_t::iterator fd_entry_it =
168       find_fd_entry_by_handle (family_entry.fd_entries, handle_);
169 
170     if (fd_entry_it == family_entry.fd_entries.end ())
171         return 0;
172 
173     fd_entry_t &fd_entry = *fd_entry_it;
174     zmq_assert (fd_entry.fd != retired_fd);
175 
176     if (family_entry_it_ != _current_family_entry_it) {
177         //  Family is not currently being iterated and can be safely
178         //  modified in-place. So later it can be skipped without
179         //  re-verifying its content.
180         family_entry.fd_entries.erase (fd_entry_it);
181     } else {
182         //  Otherwise mark removed entries as retired. It will be cleaned up
183         //  at the end of the iteration. See zmq::select_t::loop
184         fd_entry.fd = retired_fd;
185         family_entry.has_retired = true;
186     }
187     family_entry.fds_set.remove_fd (handle_);
188     return 1;
189 }
190 #endif
191 
rm_fd(handle_t handle_)192 void zmq::select_t::rm_fd (handle_t handle_)
193 {
194     check_thread ();
195     int retired = 0;
196 #if defined ZMQ_HAVE_WINDOWS
197     u_short family = get_fd_family (handle_);
198     if (family != AF_UNSPEC) {
199         family_entries_t::iterator family_entry_it =
200           _family_entries.find (family);
201 
202         retired += try_retire_fd_entry (family_entry_it, handle_);
203     } else {
204         //  get_fd_family may fail and return AF_UNSPEC if the socket was not
205         //  successfully connected. In that case, we need to look for the
206         //  socket in all family_entries.
207         family_entries_t::iterator end = _family_entries.end ();
208         for (family_entries_t::iterator family_entry_it =
209                _family_entries.begin ();
210              family_entry_it != end; ++family_entry_it) {
211             if (retired += try_retire_fd_entry (family_entry_it, handle_)) {
212                 break;
213             }
214         }
215     }
216 #else
217     fd_entries_t::iterator fd_entry_it =
218       find_fd_entry_by_handle (_family_entry.fd_entries, handle_);
219     assert (fd_entry_it != _family_entry.fd_entries.end ());
220 
221     zmq_assert (fd_entry_it->fd != retired_fd);
222     fd_entry_it->fd = retired_fd;
223     _family_entry.fds_set.remove_fd (handle_);
224 
225     ++retired;
226 
227     if (handle_ == _max_fd) {
228         _max_fd = retired_fd;
229         for (fd_entry_it = _family_entry.fd_entries.begin ();
230              fd_entry_it != _family_entry.fd_entries.end (); ++fd_entry_it)
231             if (fd_entry_it->fd > _max_fd)
232                 _max_fd = fd_entry_it->fd;
233     }
234 
235     _family_entry.has_retired = true;
236 #endif
237     zmq_assert (retired == 1);
238     adjust_load (-1);
239 }
240 
set_pollin(handle_t handle_)241 void zmq::select_t::set_pollin (handle_t handle_)
242 {
243     check_thread ();
244 #if defined ZMQ_HAVE_WINDOWS
245     u_short family = get_fd_family (handle_);
246     wsa_assert (family != AF_UNSPEC);
247     family_entry_t &family_entry = _family_entries[family];
248 #else
249     family_entry_t &family_entry = _family_entry;
250 #endif
251     FD_SET (handle_, &family_entry.fds_set.read);
252 }
253 
reset_pollin(handle_t handle_)254 void zmq::select_t::reset_pollin (handle_t handle_)
255 {
256     check_thread ();
257 #if defined ZMQ_HAVE_WINDOWS
258     u_short family = get_fd_family (handle_);
259     wsa_assert (family != AF_UNSPEC);
260     family_entry_t &family_entry = _family_entries[family];
261 #else
262     family_entry_t &family_entry = _family_entry;
263 #endif
264     FD_CLR (handle_, &family_entry.fds_set.read);
265 }
266 
set_pollout(handle_t handle_)267 void zmq::select_t::set_pollout (handle_t handle_)
268 {
269     check_thread ();
270 #if defined ZMQ_HAVE_WINDOWS
271     u_short family = get_fd_family (handle_);
272     wsa_assert (family != AF_UNSPEC);
273     family_entry_t &family_entry = _family_entries[family];
274 #else
275     family_entry_t &family_entry = _family_entry;
276 #endif
277     FD_SET (handle_, &family_entry.fds_set.write);
278 }
279 
reset_pollout(handle_t handle_)280 void zmq::select_t::reset_pollout (handle_t handle_)
281 {
282     check_thread ();
283 #if defined ZMQ_HAVE_WINDOWS
284     u_short family = get_fd_family (handle_);
285     wsa_assert (family != AF_UNSPEC);
286     family_entry_t &family_entry = _family_entries[family];
287 #else
288     family_entry_t &family_entry = _family_entry;
289 #endif
290     FD_CLR (handle_, &family_entry.fds_set.write);
291 }
292 
stop()293 void zmq::select_t::stop ()
294 {
295     check_thread ();
296     //  no-op... thread is stopped when no more fds or timers are registered
297 }
298 
max_fds()299 int zmq::select_t::max_fds ()
300 {
301     return FD_SETSIZE;
302 }
303 
loop()304 void zmq::select_t::loop ()
305 {
306     while (true) {
307         //  Execute any due timers.
308         int timeout = static_cast<int> (execute_timers ());
309 
310         cleanup_retired ();
311 
312 #ifdef _WIN32
313         if (_family_entries.empty ()) {
314 #else
315         if (_family_entry.fd_entries.empty ()) {
316 #endif
317             zmq_assert (get_load () == 0);
318 
319             if (timeout == 0)
320                 break;
321 
322             // TODO sleep for timeout
323             continue;
324         }
325 
326 #if defined ZMQ_HAVE_OSX
327         struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000};
328 #else
329         struct timeval tv = {static_cast<long> (timeout / 1000),
330                              static_cast<long> (timeout % 1000 * 1000)};
331 #endif
332 
333 #if defined ZMQ_HAVE_WINDOWS
334         /*
335             On Windows select does not allow to mix descriptors from different
336             service providers. It seems to work for AF_INET and AF_INET6,
337             but fails for AF_INET and VMCI. The workaround is to use
338             WSAEventSelect and WSAWaitForMultipleEvents to wait, then use
339             select to find out what actually changed. WSAWaitForMultipleEvents
340             cannot be used alone, because it does not support more than 64 events
341             which is not enough.
342 
343             To reduce unnecessary overhead, WSA is only used when there are more
344             than one family. Moreover, AF_INET and AF_INET6 are considered the same
345             family because Windows seems to handle them properly.
346             See get_fd_family for details.
347         */
348 
349         //  If there is just one family, there is no reason to use WSA events.
350         int rc = 0;
351         const bool use_wsa_events = _family_entries.size () > 1;
352         if (use_wsa_events) {
353             // TODO: I don't really understand why we are doing this. If any of
354             // the events was signaled, we will call select for each fd_family
355             // afterwards. The only benefit is if none of the events was
356             // signaled, then we continue early.
357             // IMHO, either WSAEventSelect/WSAWaitForMultipleEvents or select
358             // should be used, but not both
359 
360             wsa_events_t wsa_events;
361 
362             for (family_entries_t::iterator family_entry_it =
363                    _family_entries.begin ();
364                  family_entry_it != _family_entries.end (); ++family_entry_it) {
365                 family_entry_t &family_entry = family_entry_it->second;
366 
367                 for (fd_entries_t::iterator fd_entry_it =
368                        family_entry.fd_entries.begin ();
369                      fd_entry_it != family_entry.fd_entries.end ();
370                      ++fd_entry_it) {
371                     fd_t fd = fd_entry_it->fd;
372 
373                     //  http://stackoverflow.com/q/35043420/188530
374                     if (FD_ISSET (fd, &family_entry.fds_set.read)
375                         && FD_ISSET (fd, &family_entry.fds_set.write))
376                         rc = WSAEventSelect (fd, wsa_events.events[3],
377                                              FD_READ | FD_ACCEPT | FD_CLOSE
378                                                | FD_WRITE | FD_CONNECT);
379                     else if (FD_ISSET (fd, &family_entry.fds_set.read))
380                         rc = WSAEventSelect (fd, wsa_events.events[0],
381                                              FD_READ | FD_ACCEPT | FD_CLOSE);
382                     else if (FD_ISSET (fd, &family_entry.fds_set.write))
383                         rc = WSAEventSelect (fd, wsa_events.events[1],
384                                              FD_WRITE | FD_CONNECT);
385                     else
386                         rc = 0;
387 
388                     wsa_assert (rc != SOCKET_ERROR);
389                 }
390             }
391 
392             rc = WSAWaitForMultipleEvents (4, wsa_events.events, FALSE,
393                                            timeout ? timeout : INFINITE, FALSE);
394             wsa_assert (rc != (int) WSA_WAIT_FAILED);
395             zmq_assert (rc != WSA_WAIT_IO_COMPLETION);
396 
397             if (rc == WSA_WAIT_TIMEOUT)
398                 continue;
399         }
400 
401         for (_current_family_entry_it = _family_entries.begin ();
402              _current_family_entry_it != _family_entries.end ();
403              ++_current_family_entry_it) {
404             family_entry_t &family_entry = _current_family_entry_it->second;
405 
406 
407             if (use_wsa_events) {
408                 //  There is no reason to wait again after WSAWaitForMultipleEvents.
409                 //  Simply collect what is ready.
410                 struct timeval tv_nodelay = {0, 0};
411                 select_family_entry (family_entry, 0, true, tv_nodelay);
412             } else {
413                 select_family_entry (family_entry, 0, timeout > 0, tv);
414             }
415         }
416 #else
417         select_family_entry (_family_entry, _max_fd + 1, timeout > 0, tv);
418 #endif
419     }
420 }
421 
422 void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
423                                          const int max_fd_,
424                                          const bool use_timeout_,
425                                          struct timeval &tv_)
426 {
427     //  select will fail when run with empty sets.
428     fd_entries_t &fd_entries = family_entry_.fd_entries;
429     if (fd_entries.empty ())
430         return;
431 
432     fds_set_t local_fds_set = family_entry_.fds_set;
433     int rc = select (max_fd_, &local_fds_set.read, &local_fds_set.write,
434                      &local_fds_set.error, use_timeout_ ? &tv_ : NULL);
435 
436 #if defined ZMQ_HAVE_WINDOWS
437     wsa_assert (rc != SOCKET_ERROR);
438 #else
439     if (rc == -1) {
440         errno_assert (errno == EINTR);
441         return;
442     }
443 #endif
444 
445     trigger_events (fd_entries, local_fds_set, rc);
446 
447     cleanup_retired (family_entry_);
448 }
449 
450 zmq::select_t::fds_set_t::fds_set_t ()
451 {
452     FD_ZERO (&read);
453     FD_ZERO (&write);
454     FD_ZERO (&error);
455 }
456 
457 zmq::select_t::fds_set_t::fds_set_t (const fds_set_t &other_)
458 {
459 #if defined ZMQ_HAVE_WINDOWS
460     // On Windows we don't need to copy the whole fd_set.
461     // SOCKETS are continuous from the beginning of fd_array in fd_set.
462     // We just need to copy fd_count elements of fd_array.
463     // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
464     memcpy (&read, &other_.read,
465             (char *) (other_.read.fd_array + other_.read.fd_count)
466               - (char *) &other_.read);
467     memcpy (&write, &other_.write,
468             (char *) (other_.write.fd_array + other_.write.fd_count)
469               - (char *) &other_.write);
470     memcpy (&error, &other_.error,
471             (char *) (other_.error.fd_array + other_.error.fd_count)
472               - (char *) &other_.error);
473 #else
474     memcpy (&read, &other_.read, sizeof other_.read);
475     memcpy (&write, &other_.write, sizeof other_.write);
476     memcpy (&error, &other_.error, sizeof other_.error);
477 #endif
478 }
479 
480 zmq::select_t::fds_set_t &zmq::select_t::fds_set_t::
481 operator= (const fds_set_t &other_)
482 {
483 #if defined ZMQ_HAVE_WINDOWS
484     // On Windows we don't need to copy the whole fd_set.
485     // SOCKETS are continuous from the beginning of fd_array in fd_set.
486     // We just need to copy fd_count elements of fd_array.
487     // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
488     memcpy (&read, &other_.read,
489             (char *) (other_.read.fd_array + other_.read.fd_count)
490               - (char *) &other_.read);
491     memcpy (&write, &other_.write,
492             (char *) (other_.write.fd_array + other_.write.fd_count)
493               - (char *) &other_.write);
494     memcpy (&error, &other_.error,
495             (char *) (other_.error.fd_array + other_.error.fd_count)
496               - (char *) &other_.error);
497 #else
498     memcpy (&read, &other_.read, sizeof other_.read);
499     memcpy (&write, &other_.write, sizeof other_.write);
500     memcpy (&error, &other_.error, sizeof other_.error);
501 #endif
502     return *this;
503 }
504 
505 void zmq::select_t::fds_set_t::remove_fd (const fd_t &fd_)
506 {
507     FD_CLR (fd_, &read);
508     FD_CLR (fd_, &write);
509     FD_CLR (fd_, &error);
510 }
511 
512 bool zmq::select_t::cleanup_retired (family_entry_t &family_entry_)
513 {
514     if (family_entry_.has_retired) {
515         family_entry_.has_retired = false;
516         family_entry_.fd_entries.erase (
517           std::remove_if (family_entry_.fd_entries.begin (),
518                           family_entry_.fd_entries.end (), is_retired_fd),
519           family_entry_.fd_entries.end ());
520     }
521     return family_entry_.fd_entries.empty ();
522 }
523 
524 void zmq::select_t::cleanup_retired ()
525 {
526 #ifdef _WIN32
527     for (family_entries_t::iterator it = _family_entries.begin ();
528          it != _family_entries.end ();) {
529         if (cleanup_retired (it->second))
530             it = _family_entries.erase (it);
531         else
532             ++it;
533     }
534 #else
535     cleanup_retired (_family_entry);
536 #endif
537 }
538 
539 bool zmq::select_t::is_retired_fd (const fd_entry_t &entry_)
540 {
541     return entry_.fd == retired_fd;
542 }
543 
544 zmq::select_t::family_entry_t::family_entry_t () : has_retired (false)
545 {
546 }
547 
548 
549 #if defined ZMQ_HAVE_WINDOWS
550 u_short zmq::select_t::get_fd_family (fd_t fd_)
551 {
552     // cache the results of determine_fd_family, as this is frequently called
553     // for the same sockets, and determine_fd_family is expensive
554     size_t i;
555     for (i = 0; i < fd_family_cache_size; ++i) {
556         const std::pair<fd_t, u_short> &entry = _fd_family_cache[i];
557         if (entry.first == fd_) {
558             return entry.second;
559         }
560         if (entry.first == retired_fd)
561             break;
562     }
563 
564     std::pair<fd_t, u_short> res =
565       std::make_pair (fd_, determine_fd_family (fd_));
566     if (i < fd_family_cache_size) {
567         _fd_family_cache[i] = res;
568     } else {
569         // just overwrite a random entry
570         // could be optimized by some LRU strategy
571         _fd_family_cache[rand () % fd_family_cache_size] = res;
572     }
573 
574     return res.second;
575 }
576 
577 u_short zmq::select_t::determine_fd_family (fd_t fd_)
578 {
579     //  Use sockaddr_storage instead of sockaddr to accommodate different structure sizes
580     sockaddr_storage addr = {0};
581     int addr_size = sizeof addr;
582 
583     int type;
584     int type_length = sizeof (int);
585 
586     int rc = getsockopt (fd_, SOL_SOCKET, SO_TYPE,
587                          reinterpret_cast<char *> (&type), &type_length);
588 
589     if (rc == 0) {
590         if (type == SOCK_DGRAM)
591             return AF_INET;
592 
593         rc =
594           getsockname (fd_, reinterpret_cast<sockaddr *> (&addr), &addr_size);
595 
596         //  AF_INET and AF_INET6 can be mixed in select
597         //  TODO: If proven otherwise, should simply return addr.sa_family
598         if (rc != SOCKET_ERROR)
599             return addr.ss_family == AF_INET6 ? AF_INET : addr.ss_family;
600     }
601 
602     return AF_UNSPEC;
603 }
604 
605 zmq::select_t::wsa_events_t::wsa_events_t ()
606 {
607     events[0] = WSACreateEvent ();
608     wsa_assert (events[0] != WSA_INVALID_EVENT);
609     events[1] = WSACreateEvent ();
610     wsa_assert (events[1] != WSA_INVALID_EVENT);
611     events[2] = WSACreateEvent ();
612     wsa_assert (events[2] != WSA_INVALID_EVENT);
613     events[3] = WSACreateEvent ();
614     wsa_assert (events[3] != WSA_INVALID_EVENT);
615 }
616 
617 zmq::select_t::wsa_events_t::~wsa_events_t ()
618 {
619     wsa_assert (WSACloseEvent (events[0]));
620     wsa_assert (WSACloseEvent (events[1]));
621     wsa_assert (WSACloseEvent (events[2]));
622     wsa_assert (WSACloseEvent (events[3]));
623 }
624 #endif
625 
626 #endif
627