1 /*
2 Copyright (c) 2007-2011 iMatix Corporation
3 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
4
5 This file is part of 0MQ.
6
7 0MQ is free software; you can redistribute it and/or modify it under
8 the terms of the GNU Lesser General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
11
12 0MQ is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "platform.hpp"
22
23 // On AIX, poll.h has to be included before zmq.h to get consistent
24 // definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
25 // instead of 'events' and 'revents' and defines macros to map from POSIX-y
26 // names to AIX-specific names).
27 #if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
28 defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
29 defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
30 defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
31 defined ZMQ_HAVE_NETBSD
32 #include <poll.h>
33 #endif
34
35 #include "../include/zmq.h"
36 #include "../include/zmq_utils.h"
37
38 #include <string.h>
39 #include <errno.h>
40 #include <stdlib.h>
41 #include <new>
42
43 #include "device.hpp"
44 #include "socket_base.hpp"
45 #include "msg_content.hpp"
46 #include "stdint.hpp"
47 #include "config.hpp"
48 #include "likely.hpp"
49 #include "clock.hpp"
50 #include "ctx.hpp"
51 #include "err.hpp"
52 #include "fd.hpp"
53
54 #if !defined ZMQ_HAVE_WINDOWS
55 #include <unistd.h>
56 #endif
57
58 #if defined ZMQ_HAVE_OPENPGM
59 #define __PGM_WININT_H__
60 #include <pgm/pgm.h>
61
62 // TODO: OpenPGM redefines bool -- remove this once OpenPGM is fixed.
63 #if defined bool
64 #undef bool
65 #endif
66
67 #endif
68
zmq_version(int * major_,int * minor_,int * patch_)69 void zmq_version (int *major_, int *minor_, int *patch_)
70 {
71 *major_ = ZMQ_VERSION_MAJOR;
72 *minor_ = ZMQ_VERSION_MINOR;
73 *patch_ = ZMQ_VERSION_PATCH;
74 }
75
zmq_strerror(int errnum_)76 const char *zmq_strerror (int errnum_)
77 {
78 return zmq::errno_to_string (errnum_);
79 }
80
zmq_msg_init(zmq_msg_t * msg_)81 int zmq_msg_init (zmq_msg_t *msg_)
82 {
83 msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
84 msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
85 msg_->vsm_size = 0;
86 return 0;
87 }
88
zmq_msg_init_size(zmq_msg_t * msg_,size_t size_)89 int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
90 {
91 if (size_ <= ZMQ_MAX_VSM_SIZE) {
92 msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
93 msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
94 msg_->vsm_size = (uint8_t) size_;
95 }
96 else {
97 msg_->content =
98 (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
99 if (!msg_->content) {
100 errno = ENOMEM;
101 return -1;
102 }
103 msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
104
105 zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
106 content->data = (void*) (content + 1);
107 content->size = size_;
108 content->ffn = NULL;
109 content->hint = NULL;
110 new (&content->refcnt) zmq::atomic_counter_t ();
111 }
112 return 0;
113 }
114
zmq_msg_init_data(zmq_msg_t * msg_,void * data_,size_t size_,zmq_free_fn * ffn_,void * hint_)115 int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
116 zmq_free_fn *ffn_, void *hint_)
117 {
118 msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
119 alloc_assert (msg_->content);
120 msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
121 zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
122 content->data = data_;
123 content->size = size_;
124 content->ffn = ffn_;
125 content->hint = hint_;
126 new (&content->refcnt) zmq::atomic_counter_t ();
127 return 0;
128 }
129
zmq_msg_close(zmq_msg_t * msg_)130 int zmq_msg_close (zmq_msg_t *msg_)
131 {
132 // Check the validity tag.
133 if (unlikely (msg_->flags | ZMQ_MSG_MASK) != 0xff) {
134 errno = EFAULT;
135 return -1;
136 }
137
138 // For VSMs and delimiters there are no resources to free.
139 if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
140 msg_->content != (zmq::msg_content_t*) ZMQ_VSM) {
141
142 // If the content is not shared, or if it is shared and the reference.
143 // count has dropped to zero, deallocate it.
144 zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
145 if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {
146
147 // We used "placement new" operator to initialize the reference.
148 // counter so we call its destructor now.
149 content->refcnt.~atomic_counter_t ();
150
151 if (content->ffn)
152 content->ffn (content->data, content->hint);
153 free (content);
154 }
155 }
156
157 // Remove the validity tag from the message.
158 msg_->flags = 0;
159
160 return 0;
161 }
162
zmq_msg_move(zmq_msg_t * dest_,zmq_msg_t * src_)163 int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
164 {
165 #if 0
166 // Check the validity tags.
167 if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff ||
168 (src_->flags | ZMQ_MSG_MASK) != 0xff)) {
169 errno = EFAULT;
170 return -1;
171 }
172 #endif
173 zmq_msg_close (dest_);
174 *dest_ = *src_;
175 zmq_msg_init (src_);
176 return 0;
177 }
178
zmq_msg_copy(zmq_msg_t * dest_,zmq_msg_t * src_)179 int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
180 {
181 // Check the validity tags.
182 if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff ||
183 (src_->flags | ZMQ_MSG_MASK) != 0xff)) {
184 errno = EFAULT;
185 return -1;
186 }
187
188 zmq_msg_close (dest_);
189
190 // VSMs and delimiters require no special handling.
191 if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
192 src_->content != (zmq::msg_content_t*) ZMQ_VSM) {
193
194 // One reference is added to shared messages. Non-shared messages
195 // are turned into shared messages and reference count is set to 2.
196 zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
197 if (src_->flags & ZMQ_MSG_SHARED)
198 content->refcnt.add (1);
199 else {
200 src_->flags |= ZMQ_MSG_SHARED;
201 content->refcnt.set (2);
202 }
203 }
204
205 *dest_ = *src_;
206 return 0;
207 }
208
zmq_msg_data(zmq_msg_t * msg_)209 void *zmq_msg_data (zmq_msg_t *msg_)
210 {
211 zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
212
213 if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
214 return msg_->vsm_data;
215 if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
216 return NULL;
217
218 return ((zmq::msg_content_t*) msg_->content)->data;
219 }
220
zmq_msg_size(zmq_msg_t * msg_)221 size_t zmq_msg_size (zmq_msg_t *msg_)
222 {
223 zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
224
225 if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
226 return msg_->vsm_size;
227 if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
228 return 0;
229
230 return ((zmq::msg_content_t*) msg_->content)->size;
231 }
232
zmq_init(int io_threads_)233 void *zmq_init (int io_threads_)
234 {
235 if (io_threads_ < 0) {
236 errno = EINVAL;
237 return NULL;
238 }
239
240 #if defined ZMQ_HAVE_OPENPGM
241
242 // Init PGM transport. Ensure threading and timer are enabled. Find PGM
243 // protocol ID. Note that if you want to use gettimeofday and sleep for
244 // openPGM timing, set environment variables PGM_TIMER to "GTOD" and
245 // PGM_SLEEP to "USLEEP".
246 pgm_error_t *pgm_error = NULL;
247 const bool ok = pgm_init (&pgm_error);
248 if (ok != TRUE) {
249
250 // Invalid parameters don't set pgm_error_t
251 zmq_assert (pgm_error != NULL);
252 if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
253 pgm_error->code == PGM_ERROR_FAILED)) {
254
255 // Failed to access RTC or HPET device.
256 pgm_error_free (pgm_error);
257 errno = EINVAL;
258 return NULL;
259 }
260
261 // PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
262 zmq_assert (false);
263 }
264 #endif
265
266 #ifdef ZMQ_HAVE_WINDOWS
267 // Intialise Windows sockets. Note that WSAStartup can be called multiple
268 // times given that WSACleanup will be called for each WSAStartup.
269 // We do this before the ctx constructor since its embedded mailbox_t
270 // object needs Winsock to be up and running.
271 WORD version_requested = MAKEWORD (2, 2);
272 WSADATA wsa_data;
273 int rc = WSAStartup (version_requested, &wsa_data);
274 zmq_assert (rc == 0);
275 zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
276 HIBYTE (wsa_data.wVersion) == 2);
277 #endif
278
279 // Create 0MQ context.
280 zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
281 alloc_assert (ctx);
282 return (void*) ctx;
283 }
284
zmq_term(void * ctx_)285 int zmq_term (void *ctx_)
286 {
287 if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
288 errno = EFAULT;
289 return -1;
290 }
291
292 int rc = ((zmq::ctx_t*) ctx_)->terminate ();
293 int en = errno;
294
295 #ifdef ZMQ_HAVE_WINDOWS
296 // On Windows, uninitialise socket layer.
297 rc = WSACleanup ();
298 wsa_assert (rc != SOCKET_ERROR);
299 #endif
300
301 #if defined ZMQ_HAVE_OPENPGM
302 // Shut down the OpenPGM library.
303 if (pgm_shutdown () != TRUE)
304 zmq_assert (false);
305 #endif
306
307 errno = en;
308 return rc;
309 }
310
zmq_socket(void * ctx_,int type_)311 void *zmq_socket (void *ctx_, int type_)
312 {
313 if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
314 errno = EFAULT;
315 return NULL;
316 }
317 return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_));
318 }
319
zmq_close(void * s_)320 int zmq_close (void *s_)
321 {
322 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
323 errno = ENOTSOCK;
324 return -1;
325 }
326 ((zmq::socket_base_t*) s_)->close ();
327 return 0;
328 }
329
zmq_setsockopt(void * s_,int option_,const void * optval_,size_t optvallen_)330 int zmq_setsockopt (void *s_, int option_, const void *optval_,
331 size_t optvallen_)
332 {
333 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
334 errno = ENOTSOCK;
335 return -1;
336 }
337 return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
338 optvallen_));
339 }
340
zmq_getsockopt(void * s_,int option_,void * optval_,size_t * optvallen_)341 int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
342 {
343 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
344 errno = ENOTSOCK;
345 return -1;
346 }
347 return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_,
348 optvallen_));
349 }
350
zmq_bind(void * s_,const char * addr_)351 int zmq_bind (void *s_, const char *addr_)
352 {
353 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
354 errno = ENOTSOCK;
355 return -1;
356 }
357 return (((zmq::socket_base_t*) s_)->bind (addr_));
358 }
359
zmq_connect(void * s_,const char * addr_)360 int zmq_connect (void *s_, const char *addr_)
361 {
362 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
363 errno = ENOTSOCK;
364 return -1;
365 }
366 return (((zmq::socket_base_t*) s_)->connect (addr_));
367 }
368
zmq_send(void * s_,zmq_msg_t * msg_,int flags_)369 int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
370 {
371 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
372 errno = ENOTSOCK;
373 return -1;
374 }
375 return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
376 }
377
zmq_recv(void * s_,zmq_msg_t * msg_,int flags_)378 int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
379 {
380 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
381 errno = ENOTSOCK;
382 return -1;
383 }
384 return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
385 }
386
387 #if defined ZMQ_FORCE_SELECT
388 #define ZMQ_POLL_BASED_ON_SELECT
389 #elif defined ZMQ_FORCE_POLL
390 #define ZMQ_POLL_BASED_ON_POLL
391 #elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
392 defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
393 defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
394 defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
395 defined ZMQ_HAVE_NETBSD
396 #define ZMQ_POLL_BASED_ON_POLL
397 #elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
398 #define ZMQ_POLL_BASED_ON_SELECT
399 #endif
400
zmq_poll(zmq_pollitem_t * items_,int nitems_,long timeout_)401 int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
402 {
403 #if defined ZMQ_POLL_BASED_ON_POLL
404 if (unlikely (nitems_ < 0)) {
405 errno = EINVAL;
406 return -1;
407 }
408 if (unlikely (nitems_ == 0)) {
409 if (timeout_ == 0)
410 return 0;
411 #if defined ZMQ_HAVE_WINDOWS
412 Sleep (timeout_ > 0 ? timeout_ / 1000 : INFINITE);
413 return 0;
414 #else
415 usleep (timeout_);
416 return 0;
417 #endif
418 }
419
420 if (!items_) {
421 errno = EFAULT;
422 return -1;
423 }
424
425 zmq::clock_t clock;
426 uint64_t now = 0;
427 uint64_t end = 0;
428
429 pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
430 alloc_assert (pollfds);
431
432 // Build pollset for poll () system call.
433 for (int i = 0; i != nitems_; i++) {
434
435 // If the poll item is a 0MQ socket, we poll on the file descriptor
436 // retrieved by the ZMQ_FD socket option.
437 if (items_ [i].socket) {
438 size_t zmq_fd_size = sizeof (zmq::fd_t);
439 if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
440 &zmq_fd_size) == -1) {
441 free (pollfds);
442 return -1;
443 }
444 pollfds [i].events = items_ [i].events ? POLLIN : 0;
445 }
446 // Else, the poll item is a raw file descriptor. Just convert the
447 // events to normal POLLIN/POLLOUT for poll ().
448 else {
449 pollfds [i].fd = items_ [i].fd;
450 pollfds [i].events =
451 (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
452 (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
453 }
454 }
455
456 bool first_pass = true;
457 int nevents = 0;
458
459 while (true) {
460
461 // Compute the timeout for the subsequent poll.
462 int timeout;
463 if (first_pass)
464 timeout = 0;
465 else if (timeout_ < 0)
466 timeout = -1;
467 else
468 timeout = end - now;
469
470 // Wait for events.
471 while (true) {
472 int rc = poll (pollfds, nitems_, timeout);
473 if (rc == -1 && errno == EINTR) {
474 free (pollfds);
475 return -1;
476 }
477 errno_assert (rc >= 0);
478 break;
479 }
480
481 // Check for the events.
482 for (int i = 0; i != nitems_; i++) {
483
484 items_ [i].revents = 0;
485
486 // The poll item is a 0MQ socket. Retrieve pending events
487 // using the ZMQ_EVENTS socket option.
488 if (items_ [i].socket) {
489 size_t zmq_events_size = sizeof (uint32_t);
490 uint32_t zmq_events;
491 if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
492 &zmq_events_size) == -1) {
493 free (pollfds);
494 return -1;
495 }
496 if ((items_ [i].events & ZMQ_POLLOUT) &&
497 (zmq_events & ZMQ_POLLOUT))
498 items_ [i].revents |= ZMQ_POLLOUT;
499 if ((items_ [i].events & ZMQ_POLLIN) &&
500 (zmq_events & ZMQ_POLLIN))
501 items_ [i].revents |= ZMQ_POLLIN;
502 }
503 // Else, the poll item is a raw file descriptor, simply convert
504 // the events to zmq_pollitem_t-style format.
505 else {
506 if (pollfds [i].revents & POLLIN)
507 items_ [i].revents |= ZMQ_POLLIN;
508 if (pollfds [i].revents & POLLOUT)
509 items_ [i].revents |= ZMQ_POLLOUT;
510 if (pollfds [i].revents & ~(POLLIN | POLLOUT))
511 items_ [i].revents |= ZMQ_POLLERR;
512 }
513
514 if (items_ [i].revents)
515 nevents++;
516 }
517
518 // If timout is zero, exit immediately whether there are events or not.
519 if (timeout_ == 0)
520 break;
521
522 // If there are events to return, we can exit immediately.
523 if (nevents)
524 break;
525
526 // At this point we are meant to wait for events but there are none.
527 // If timeout is infinite we can just loop until we get some events.
528 if (timeout_ < 0) {
529 if (first_pass)
530 first_pass = false;
531 continue;
532 }
533
534 // The timeout is finite and there are no events. In the first pass
535 // we get a timestamp of when the polling have begun. (We assume that
536 // first pass have taken negligible time). We also compute the time
537 // when the polling should time out.
538 if (first_pass) {
539 now = clock.now_ms ();
540 end = now + (timeout_ / 1000);
541 if (now == end)
542 break;
543 first_pass = false;
544 continue;
545 }
546
547 // Find out whether timeout have expired.
548 now = clock.now_ms ();
549 if (now >= end)
550 break;
551 }
552
553 free (pollfds);
554 return nevents;
555
556 #elif defined ZMQ_POLL_BASED_ON_SELECT
557
558 if (unlikely (nitems_ < 0)) {
559 errno = EINVAL;
560 return -1;
561 }
562 if (unlikely (nitems_ == 0)) {
563 if (timeout_ == 0)
564 return 0;
565 #if defined ZMQ_HAVE_WINDOWS
566 Sleep (timeout_ > 0 ? timeout_ / 1000 : INFINITE);
567 return 0;
568 #else
569 usleep (timeout_);
570 return 0;
571 #endif
572 }
573
574 if (!items_) {
575 errno = EFAULT;
576 return -1;
577 }
578
579 zmq::clock_t clock;
580 uint64_t now = 0;
581 uint64_t end = 0;
582
583 // Ensure we do not attempt to select () on more than FD_SETSIZE
584 // file descriptors.
585 zmq_assert (nitems_ <= FD_SETSIZE);
586
587 fd_set pollset_in;
588 FD_ZERO (&pollset_in);
589 fd_set pollset_out;
590 FD_ZERO (&pollset_out);
591 fd_set pollset_err;
592 FD_ZERO (&pollset_err);
593
594 zmq::fd_t maxfd = 0;
595
596 // Build the fd_sets for passing to select ().
597 for (int i = 0; i != nitems_; i++) {
598
599 // If the poll item is a 0MQ socket we are interested in input on the
600 // notification file descriptor retrieved by the ZMQ_FD socket option.
601 if (items_ [i].socket) {
602 size_t zmq_fd_size = sizeof (zmq::fd_t);
603 zmq::fd_t notify_fd;
604 if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd,
605 &zmq_fd_size) == -1)
606 return -1;
607 if (items_ [i].events) {
608 FD_SET (notify_fd, &pollset_in);
609 if (maxfd < notify_fd)
610 maxfd = notify_fd;
611 }
612 }
613 // Else, the poll item is a raw file descriptor. Convert the poll item
614 // events to the appropriate fd_sets.
615 else {
616 if (items_ [i].events & ZMQ_POLLIN)
617 FD_SET (items_ [i].fd, &pollset_in);
618 if (items_ [i].events & ZMQ_POLLOUT)
619 FD_SET (items_ [i].fd, &pollset_out);
620 if (items_ [i].events & ZMQ_POLLERR)
621 FD_SET (items_ [i].fd, &pollset_err);
622 if (maxfd < items_ [i].fd)
623 maxfd = items_ [i].fd;
624 }
625 }
626
627 bool first_pass = true;
628 int nevents = 0;
629 fd_set inset, outset, errset;
630
631 while (true) {
632
633 // Compute the timeout for the subsequent poll.
634 timeval timeout;
635 timeval *ptimeout;
636 if (first_pass) {
637 timeout.tv_sec = 0;
638 timeout.tv_usec = 0;
639 ptimeout = &timeout;
640 }
641 else if (timeout_ < 0)
642 ptimeout = NULL;
643 else {
644 timeout.tv_sec = (long) ((end - now) / 1000);
645 timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
646 ptimeout = &timeout;
647 }
648
649 // Wait for events. Ignore interrupts if there's infinite timeout.
650 while (true) {
651 memcpy (&inset, &pollset_in, sizeof (fd_set));
652 memcpy (&outset, &pollset_out, sizeof (fd_set));
653 memcpy (&errset, &pollset_err, sizeof (fd_set));
654 #if defined ZMQ_HAVE_WINDOWS
655 int rc = select (0, &inset, &outset, &errset, ptimeout);
656 if (unlikely (rc == SOCKET_ERROR)) {
657 zmq::wsa_error_to_errno ();
658 if (errno == ENOTSOCK)
659 return -1;
660 wsa_assert (false);
661 }
662 #else
663 int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
664 if (unlikely (rc == -1)) {
665 if (errno == EINTR || errno == EBADF)
666 return -1;
667 errno_assert (false);
668 }
669 #endif
670 break;
671 }
672
673 // Check for the events.
674 for (int i = 0; i != nitems_; i++) {
675
676 items_ [i].revents = 0;
677
678 // The poll item is a 0MQ socket. Retrieve pending events
679 // using the ZMQ_EVENTS socket option.
680 if (items_ [i].socket) {
681 size_t zmq_events_size = sizeof (uint32_t);
682 uint32_t zmq_events;
683 if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
684 &zmq_events_size) == -1)
685 return -1;
686 if ((items_ [i].events & ZMQ_POLLOUT) &&
687 (zmq_events & ZMQ_POLLOUT))
688 items_ [i].revents |= ZMQ_POLLOUT;
689 if ((items_ [i].events & ZMQ_POLLIN) &&
690 (zmq_events & ZMQ_POLLIN))
691 items_ [i].revents |= ZMQ_POLLIN;
692 }
693 // Else, the poll item is a raw file descriptor, simply convert
694 // the events to zmq_pollitem_t-style format.
695 else {
696 if (FD_ISSET (items_ [i].fd, &inset))
697 items_ [i].revents |= ZMQ_POLLIN;
698 if (FD_ISSET (items_ [i].fd, &outset))
699 items_ [i].revents |= ZMQ_POLLOUT;
700 if (FD_ISSET (items_ [i].fd, &errset))
701 items_ [i].revents |= ZMQ_POLLERR;
702 }
703
704 if (items_ [i].revents)
705 nevents++;
706 }
707
708 // If timout is zero, exit immediately whether there are events or not.
709 if (timeout_ == 0)
710 break;
711
712 // If there are events to return, we can exit immediately.
713 if (nevents)
714 break;
715
716 // At this point we are meant to wait for events but there are none.
717 // If timeout is infinite we can just loop until we get some events.
718 if (timeout_ < 0) {
719 if (first_pass)
720 first_pass = false;
721 continue;
722 }
723
724 // The timeout is finite and there are no events. In the first pass
725 // we get a timestamp of when the polling have begun. (We assume that
726 // first pass have taken negligible time). We also compute the time
727 // when the polling should time out.
728 if (first_pass) {
729 now = clock.now_ms ();
730 end = now + (timeout_ / 1000);
731 if (now == end)
732 break;
733 first_pass = false;
734 continue;
735 }
736
737 // Find out whether timeout have expired.
738 now = clock.now_ms ();
739 if (now >= end)
740 break;
741 }
742
743 return nevents;
744
745 #else
746 // Exotic platforms that support neither poll() nor select().
747 errno = ENOTSUP;
748 return -1;
749 #endif
750 }
751
752 #if defined ZMQ_POLL_BASED_ON_SELECT
753 #undef ZMQ_POLL_BASED_ON_SELECT
754 #endif
755 #if defined ZMQ_POLL_BASED_ON_POLL
756 #undef ZMQ_POLL_BASED_ON_POLL
757 #endif
758
zmq_errno()759 int zmq_errno ()
760 {
761 return errno;
762 }
763
zmq_device(int device_,void * insocket_,void * outsocket_)764 int zmq_device (int device_, void *insocket_, void *outsocket_)
765 {
766 if (!insocket_ || !outsocket_) {
767 errno = EFAULT;
768 return -1;
769 }
770
771 if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE &&
772 device_ != ZMQ_STREAMER) {
773 errno = EINVAL;
774 return -1;
775 }
776
777 return zmq::device ((zmq::socket_base_t*) insocket_,
778 (zmq::socket_base_t*) outsocket_);
779 }
780
781 ////////////////////////////////////////////////////////////////////////////////
782 // 0MQ utils - to be used by perf tests
783 ////////////////////////////////////////////////////////////////////////////////
784
zmq_sleep(int seconds_)785 void zmq_sleep (int seconds_)
786 {
787 #if defined ZMQ_HAVE_WINDOWS
788 Sleep (seconds_ * 1000);
789 #else
790 sleep (seconds_);
791 #endif
792 }
793
zmq_stopwatch_start()794 void *zmq_stopwatch_start ()
795 {
796 uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t));
797 alloc_assert (watch);
798 *watch = zmq::clock_t::now_us ();
799 return (void*) watch;
800 }
801
zmq_stopwatch_stop(void * watch_)802 unsigned long zmq_stopwatch_stop (void *watch_)
803 {
804 uint64_t end = zmq::clock_t::now_us ();
805 uint64_t start = *(uint64_t*) watch_;
806 free (watch_);
807 return (unsigned long) (end - start);
808 }
809