1 /*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "precompiled.hpp"
31 #include <new>
32 #include <string>
33 #include <algorithm>
34 #include <limits>
35
36 #include "macros.hpp"
37
38 #if defined ZMQ_HAVE_WINDOWS
39 #if defined _MSC_VER
40 #if defined _WIN32_WCE
41 #include <cmnintrin.h>
42 #else
43 #include <intrin.h>
44 #endif
45 #endif
46 #else
47 #include <unistd.h>
48 #include <ctype.h>
49 #endif
50
51 #include "socket_base.hpp"
52 #include "tcp_listener.hpp"
53 #include "ws_listener.hpp"
54 #include "ipc_listener.hpp"
55 #include "tipc_listener.hpp"
56 #include "tcp_connecter.hpp"
57 #ifdef ZMQ_HAVE_WS
58 #include "ws_address.hpp"
59 #endif
60 #include "io_thread.hpp"
61 #include "session_base.hpp"
62 #include "config.hpp"
63 #include "pipe.hpp"
64 #include "err.hpp"
65 #include "ctx.hpp"
66 #include "likely.hpp"
67 #include "msg.hpp"
68 #include "address.hpp"
69 #include "ipc_address.hpp"
70 #include "tcp_address.hpp"
71 #include "udp_address.hpp"
72 #include "tipc_address.hpp"
73 #include "mailbox.hpp"
74 #include "mailbox_safe.hpp"
75
76 #ifdef ZMQ_HAVE_WSS
77 #include "wss_address.hpp"
78 #endif
79 #if defined ZMQ_HAVE_VMCI
80 #include "vmci_address.hpp"
81 #include "vmci_listener.hpp"
82 #endif
83
84 #ifdef ZMQ_HAVE_OPENPGM
85 #include "pgm_socket.hpp"
86 #endif
87
88 #include "pair.hpp"
89 #include "pub.hpp"
90 #include "sub.hpp"
91 #include "req.hpp"
92 #include "rep.hpp"
93 #include "pull.hpp"
94 #include "push.hpp"
95 #include "dealer.hpp"
96 #include "router.hpp"
97 #include "xpub.hpp"
98 #include "xsub.hpp"
99 #include "stream.hpp"
100 #include "server.hpp"
101 #include "client.hpp"
102 #include "radio.hpp"
103 #include "dish.hpp"
104 #include "gather.hpp"
105 #include "scatter.hpp"
106 #include "dgram.hpp"
107 #include "peer.hpp"
108 #include "channel.hpp"
109
emplace(const char * endpoint_uri_,pipe_t * pipe_)110 void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_,
111 pipe_t *pipe_)
112 {
113 _inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_), pipe_);
114 }
115
erase_pipes(const std::string & endpoint_uri_str_)116 int zmq::socket_base_t::inprocs_t::erase_pipes (
117 const std::string &endpoint_uri_str_)
118 {
119 const std::pair<map_t::iterator, map_t::iterator> range =
120 _inprocs.equal_range (endpoint_uri_str_);
121 if (range.first == range.second) {
122 errno = ENOENT;
123 return -1;
124 }
125
126 for (map_t::iterator it = range.first; it != range.second; ++it) {
127 it->second->send_disconnect_msg ();
128 it->second->terminate (true);
129 }
130 _inprocs.erase (range.first, range.second);
131 return 0;
132 }
133
erase_pipe(const pipe_t * pipe_)134 void zmq::socket_base_t::inprocs_t::erase_pipe (const pipe_t *pipe_)
135 {
136 for (map_t::iterator it = _inprocs.begin (), end = _inprocs.end ();
137 it != end; ++it)
138 if (it->second == pipe_) {
139 _inprocs.erase (it);
140 break;
141 }
142 }
143
check_tag() const144 bool zmq::socket_base_t::check_tag () const
145 {
146 return _tag == 0xbaddecaf;
147 }
148
is_thread_safe() const149 bool zmq::socket_base_t::is_thread_safe () const
150 {
151 return _thread_safe;
152 }
153
create(int type_,class ctx_t * parent_,uint32_t tid_,int sid_)154 zmq::socket_base_t *zmq::socket_base_t::create (int type_,
155 class ctx_t *parent_,
156 uint32_t tid_,
157 int sid_)
158 {
159 socket_base_t *s = NULL;
160 switch (type_) {
161 case ZMQ_PAIR:
162 s = new (std::nothrow) pair_t (parent_, tid_, sid_);
163 break;
164 case ZMQ_PUB:
165 s = new (std::nothrow) pub_t (parent_, tid_, sid_);
166 break;
167 case ZMQ_SUB:
168 s = new (std::nothrow) sub_t (parent_, tid_, sid_);
169 break;
170 case ZMQ_REQ:
171 s = new (std::nothrow) req_t (parent_, tid_, sid_);
172 break;
173 case ZMQ_REP:
174 s = new (std::nothrow) rep_t (parent_, tid_, sid_);
175 break;
176 case ZMQ_DEALER:
177 s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
178 break;
179 case ZMQ_ROUTER:
180 s = new (std::nothrow) router_t (parent_, tid_, sid_);
181 break;
182 case ZMQ_PULL:
183 s = new (std::nothrow) pull_t (parent_, tid_, sid_);
184 break;
185 case ZMQ_PUSH:
186 s = new (std::nothrow) push_t (parent_, tid_, sid_);
187 break;
188 case ZMQ_XPUB:
189 s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
190 break;
191 case ZMQ_XSUB:
192 s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
193 break;
194 case ZMQ_STREAM:
195 s = new (std::nothrow) stream_t (parent_, tid_, sid_);
196 break;
197 case ZMQ_SERVER:
198 s = new (std::nothrow) server_t (parent_, tid_, sid_);
199 break;
200 case ZMQ_CLIENT:
201 s = new (std::nothrow) client_t (parent_, tid_, sid_);
202 break;
203 case ZMQ_RADIO:
204 s = new (std::nothrow) radio_t (parent_, tid_, sid_);
205 break;
206 case ZMQ_DISH:
207 s = new (std::nothrow) dish_t (parent_, tid_, sid_);
208 break;
209 case ZMQ_GATHER:
210 s = new (std::nothrow) gather_t (parent_, tid_, sid_);
211 break;
212 case ZMQ_SCATTER:
213 s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
214 break;
215 case ZMQ_DGRAM:
216 s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
217 break;
218 case ZMQ_PEER:
219 s = new (std::nothrow) peer_t (parent_, tid_, sid_);
220 break;
221 case ZMQ_CHANNEL:
222 s = new (std::nothrow) channel_t (parent_, tid_, sid_);
223 break;
224 default:
225 errno = EINVAL;
226 return NULL;
227 }
228
229 alloc_assert (s);
230
231 if (s->_mailbox == NULL) {
232 s->_destroyed = true;
233 LIBZMQ_DELETE (s);
234 return NULL;
235 }
236
237 return s;
238 }
239
socket_base_t(ctx_t * parent_,uint32_t tid_,int sid_,bool thread_safe_)240 zmq::socket_base_t::socket_base_t (ctx_t *parent_,
241 uint32_t tid_,
242 int sid_,
243 bool thread_safe_) :
244 own_t (parent_, tid_),
245 _sync (),
246 _tag (0xbaddecaf),
247 _ctx_terminated (false),
248 _destroyed (false),
249 _poller (NULL),
250 _handle (static_cast<poller_t::handle_t> (NULL)),
251 _last_tsc (0),
252 _ticks (0),
253 _rcvmore (false),
254 _monitor_socket (NULL),
255 _monitor_events (0),
256 _thread_safe (thread_safe_),
257 _reaper_signaler (NULL),
258 _monitor_sync ()
259 {
260 options.socket_id = sid_;
261 options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
262 options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
263 options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0;
264
265 if (_thread_safe) {
266 _mailbox = new (std::nothrow) mailbox_safe_t (&_sync);
267 zmq_assert (_mailbox);
268 } else {
269 mailbox_t *m = new (std::nothrow) mailbox_t ();
270 zmq_assert (m);
271
272 if (m->get_fd () != retired_fd)
273 _mailbox = m;
274 else {
275 LIBZMQ_DELETE (m);
276 _mailbox = NULL;
277 }
278 }
279 }
280
get_peer_state(const void * routing_id_,size_t routing_id_size_) const281 int zmq::socket_base_t::get_peer_state (const void *routing_id_,
282 size_t routing_id_size_) const
283 {
284 LIBZMQ_UNUSED (routing_id_);
285 LIBZMQ_UNUSED (routing_id_size_);
286
287 // Only ROUTER sockets support this
288 errno = ENOTSUP;
289 return -1;
290 }
291
~socket_base_t()292 zmq::socket_base_t::~socket_base_t ()
293 {
294 if (_mailbox)
295 LIBZMQ_DELETE (_mailbox);
296
297 if (_reaper_signaler)
298 LIBZMQ_DELETE (_reaper_signaler);
299
300 scoped_lock_t lock (_monitor_sync);
301 stop_monitor ();
302
303 zmq_assert (_destroyed);
304 }
305
get_mailbox() const306 zmq::i_mailbox *zmq::socket_base_t::get_mailbox () const
307 {
308 return _mailbox;
309 }
310
stop()311 void zmq::socket_base_t::stop ()
312 {
313 // Called by ctx when it is terminated (zmq_ctx_term).
314 // 'stop' command is sent from the threads that called zmq_ctx_term to
315 // the thread owning the socket. This way, blocking call in the
316 // owner thread can be interrupted.
317 send_stop ();
318 }
319
320 // TODO consider renaming protocol_ to scheme_ in conformance with RFC 3986
321 // terminology, but this requires extensive changes to be consistent
parse_uri(const char * uri_,std::string & protocol_,std::string & path_)322 int zmq::socket_base_t::parse_uri (const char *uri_,
323 std::string &protocol_,
324 std::string &path_)
325 {
326 zmq_assert (uri_ != NULL);
327
328 const std::string uri (uri_);
329 const std::string::size_type pos = uri.find ("://");
330 if (pos == std::string::npos) {
331 errno = EINVAL;
332 return -1;
333 }
334 protocol_ = uri.substr (0, pos);
335 path_ = uri.substr (pos + 3);
336
337 if (protocol_.empty () || path_.empty ()) {
338 errno = EINVAL;
339 return -1;
340 }
341 return 0;
342 }
343
check_protocol(const std::string & protocol_) const344 int zmq::socket_base_t::check_protocol (const std::string &protocol_) const
345 {
346 // First check out whether the protocol is something we are aware of.
347 if (protocol_ != protocol_name::inproc
348 #if defined ZMQ_HAVE_IPC
349 && protocol_ != protocol_name::ipc
350 #endif
351 && protocol_ != protocol_name::tcp
352 #ifdef ZMQ_HAVE_WS
353 && protocol_ != protocol_name::ws
354 #endif
355 #ifdef ZMQ_HAVE_WSS
356 && protocol_ != protocol_name::wss
357 #endif
358 #if defined ZMQ_HAVE_OPENPGM
359 // pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
360 && protocol_ != protocol_name::pgm
361 && protocol_ != protocol_name::epgm
362 #endif
363 #if defined ZMQ_HAVE_TIPC
364 // TIPC transport is only available on Linux.
365 && protocol_ != protocol_name::tipc
366 #endif
367 #if defined ZMQ_HAVE_NORM
368 && protocol_ != protocol_name::norm
369 #endif
370 #if defined ZMQ_HAVE_VMCI
371 && protocol_ != protocol_name::vmci
372 #endif
373 && protocol_ != protocol_name::udp) {
374 errno = EPROTONOSUPPORT;
375 return -1;
376 }
377
378 // Check whether socket type and transport protocol match.
379 // Specifically, multicast protocols can't be combined with
380 // bi-directional messaging patterns (socket types).
381 #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
382 #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
383 if ((protocol_ == protocol_name::pgm || protocol_ == protocol_name::epgm
384 || protocol_ == protocol_name::norm)
385 #elif defined ZMQ_HAVE_OPENPGM
386 if ((protocol_ == protocol_name::pgm || protocol_ == protocol_name::epgm)
387 #else // defined ZMQ_HAVE_NORM
388 if (protocol_ == protocol_name::norm
389 #endif
390 && options.type != ZMQ_PUB && options.type != ZMQ_SUB
391 && options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
392 errno = ENOCOMPATPROTO;
393 return -1;
394 }
395 #endif
396
397 if (protocol_ == protocol_name::udp
398 && (options.type != ZMQ_DISH && options.type != ZMQ_RADIO
399 && options.type != ZMQ_DGRAM)) {
400 errno = ENOCOMPATPROTO;
401 return -1;
402 }
403
404 // Protocol is available.
405 return 0;
406 }
407
attach_pipe(pipe_t * pipe_,bool subscribe_to_all_,bool locally_initiated_)408 void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
409 bool subscribe_to_all_,
410 bool locally_initiated_)
411 {
412 // First, register the pipe so that we can terminate it later on.
413 pipe_->set_event_sink (this);
414 _pipes.push_back (pipe_);
415
416 // Let the derived socket type know about new pipe.
417 xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
418
419 // If the socket is already being closed, ask any new pipes to terminate
420 // straight away.
421 if (is_terminating ()) {
422 register_term_acks (1);
423 pipe_->terminate (false);
424 }
425 }
426
setsockopt(int option_,const void * optval_,size_t optvallen_)427 int zmq::socket_base_t::setsockopt (int option_,
428 const void *optval_,
429 size_t optvallen_)
430 {
431 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
432
433 if (unlikely (_ctx_terminated)) {
434 errno = ETERM;
435 return -1;
436 }
437
438 // First, check whether specific socket type overloads the option.
439 int rc = xsetsockopt (option_, optval_, optvallen_);
440 if (rc == 0 || errno != EINVAL) {
441 return rc;
442 }
443
444 // If the socket type doesn't support the option, pass it to
445 // the generic option parser.
446 rc = options.setsockopt (option_, optval_, optvallen_);
447 update_pipe_options (option_);
448
449 return rc;
450 }
451
getsockopt(int option_,void * optval_,size_t * optvallen_)452 int zmq::socket_base_t::getsockopt (int option_,
453 void *optval_,
454 size_t *optvallen_)
455 {
456 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
457
458 if (unlikely (_ctx_terminated)) {
459 errno = ETERM;
460 return -1;
461 }
462
463 if (option_ == ZMQ_RCVMORE) {
464 return do_getsockopt<int> (optval_, optvallen_, _rcvmore ? 1 : 0);
465 }
466
467 if (option_ == ZMQ_FD) {
468 if (_thread_safe) {
469 // thread safe socket doesn't provide file descriptor
470 errno = EINVAL;
471 return -1;
472 }
473
474 return do_getsockopt<fd_t> (
475 optval_, optvallen_,
476 (static_cast<mailbox_t *> (_mailbox))->get_fd ());
477 }
478
479 if (option_ == ZMQ_EVENTS) {
480 const int rc = process_commands (0, false);
481 if (rc != 0 && (errno == EINTR || errno == ETERM)) {
482 return -1;
483 }
484 errno_assert (rc == 0);
485
486 return do_getsockopt<int> (optval_, optvallen_,
487 (has_out () ? ZMQ_POLLOUT : 0)
488 | (has_in () ? ZMQ_POLLIN : 0));
489 }
490
491 if (option_ == ZMQ_LAST_ENDPOINT) {
492 return do_getsockopt (optval_, optvallen_, _last_endpoint);
493 }
494
495 if (option_ == ZMQ_THREAD_SAFE) {
496 return do_getsockopt<int> (optval_, optvallen_, _thread_safe ? 1 : 0);
497 }
498
499 return options.getsockopt (option_, optval_, optvallen_);
500 }
501
join(const char * group_)502 int zmq::socket_base_t::join (const char *group_)
503 {
504 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
505
506 return xjoin (group_);
507 }
508
leave(const char * group_)509 int zmq::socket_base_t::leave (const char *group_)
510 {
511 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
512
513 return xleave (group_);
514 }
515
add_signaler(signaler_t * s_)516 void zmq::socket_base_t::add_signaler (signaler_t *s_)
517 {
518 zmq_assert (_thread_safe);
519
520 scoped_lock_t sync_lock (_sync);
521 (static_cast<mailbox_safe_t *> (_mailbox))->add_signaler (s_);
522 }
523
remove_signaler(signaler_t * s_)524 void zmq::socket_base_t::remove_signaler (signaler_t *s_)
525 {
526 zmq_assert (_thread_safe);
527
528 scoped_lock_t sync_lock (_sync);
529 (static_cast<mailbox_safe_t *> (_mailbox))->remove_signaler (s_);
530 }
531
bind(const char * endpoint_uri_)532 int zmq::socket_base_t::bind (const char *endpoint_uri_)
533 {
534 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
535
536 if (unlikely (_ctx_terminated)) {
537 errno = ETERM;
538 return -1;
539 }
540
541 // Process pending commands, if any.
542 int rc = process_commands (0, false);
543 if (unlikely (rc != 0)) {
544 return -1;
545 }
546
547 // Parse endpoint_uri_ string.
548 std::string protocol;
549 std::string address;
550 if (parse_uri (endpoint_uri_, protocol, address)
551 || check_protocol (protocol)) {
552 return -1;
553 }
554
555 if (protocol == protocol_name::inproc) {
556 const endpoint_t endpoint = {this, options};
557 rc = register_endpoint (endpoint_uri_, endpoint);
558 if (rc == 0) {
559 connect_pending (endpoint_uri_, this);
560 _last_endpoint.assign (endpoint_uri_);
561 options.connected = true;
562 }
563 return rc;
564 }
565
566 #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
567 #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
568 if (protocol == protocol_name::pgm || protocol == protocol_name::epgm
569 || protocol == protocol_name::norm) {
570 #elif defined ZMQ_HAVE_OPENPGM
571 if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
572 #else // defined ZMQ_HAVE_NORM
573 if (protocol == protocol_name::norm) {
574 #endif
575 // For convenience's sake, bind can be used interchangeable with
576 // connect for PGM, EPGM, NORM transports.
577 rc = connect (endpoint_uri_);
578 if (rc != -1)
579 options.connected = true;
580 return rc;
581 }
582 #endif
583
584 if (protocol == protocol_name::udp) {
585 if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
586 errno = ENOCOMPATPROTO;
587 return -1;
588 }
589
590 // Choose the I/O thread to run the session in.
591 io_thread_t *io_thread = choose_io_thread (options.affinity);
592 if (!io_thread) {
593 errno = EMTHREAD;
594 return -1;
595 }
596
597 address_t *paddr =
598 new (std::nothrow) address_t (protocol, address, this->get_ctx ());
599 alloc_assert (paddr);
600
601 paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
602 alloc_assert (paddr->resolved.udp_addr);
603 rc = paddr->resolved.udp_addr->resolve (address.c_str (), true,
604 options.ipv6);
605 if (rc != 0) {
606 LIBZMQ_DELETE (paddr);
607 return -1;
608 }
609
610 session_base_t *session =
611 session_base_t::create (io_thread, true, this, options, paddr);
612 errno_assert (session);
613
614 // Create a bi-directional pipe.
615 object_t *parents[2] = {this, session};
616 pipe_t *new_pipes[2] = {NULL, NULL};
617
618 int hwms[2] = {options.sndhwm, options.rcvhwm};
619 bool conflates[2] = {false, false};
620 rc = pipepair (parents, new_pipes, hwms, conflates);
621 errno_assert (rc == 0);
622
623 // Attach local end of the pipe to the socket object.
624 attach_pipe (new_pipes[0], true, true);
625 pipe_t *const newpipe = new_pipes[0];
626
627 // Attach remote end of the pipe to the session object later on.
628 session->attach_pipe (new_pipes[1]);
629
630 // Save last endpoint URI
631 paddr->to_string (_last_endpoint);
632
633 // TODO shouldn't this use _last_endpoint instead of endpoint_uri_? as in the other cases
634 add_endpoint (endpoint_uri_pair_t (endpoint_uri_, std::string (),
635 endpoint_type_none),
636 static_cast<own_t *> (session), newpipe);
637
638 return 0;
639 }
640
641 // Remaining transports require to be run in an I/O thread, so at this
642 // point we'll choose one.
643 io_thread_t *io_thread = choose_io_thread (options.affinity);
644 if (!io_thread) {
645 errno = EMTHREAD;
646 return -1;
647 }
648
649 if (protocol == protocol_name::tcp) {
650 tcp_listener_t *listener =
651 new (std::nothrow) tcp_listener_t (io_thread, this, options);
652 alloc_assert (listener);
653 rc = listener->set_local_address (address.c_str ());
654 if (rc != 0) {
655 LIBZMQ_DELETE (listener);
656 event_bind_failed (make_unconnected_bind_endpoint_pair (address),
657 zmq_errno ());
658 return -1;
659 }
660
661 // Save last endpoint URI
662 listener->get_local_address (_last_endpoint);
663
664 add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
665 static_cast<own_t *> (listener), NULL);
666 options.connected = true;
667 return 0;
668 }
669
670 #ifdef ZMQ_HAVE_WS
671 #ifdef ZMQ_HAVE_WSS
672 if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
673 ws_listener_t *listener = new (std::nothrow) ws_listener_t (
674 io_thread, this, options, protocol == protocol_name::wss);
675 #else
676 if (protocol == protocol_name::ws) {
677 ws_listener_t *listener =
678 new (std::nothrow) ws_listener_t (io_thread, this, options, false);
679 #endif
680 alloc_assert (listener);
681 rc = listener->set_local_address (address.c_str ());
682 if (rc != 0) {
683 LIBZMQ_DELETE (listener);
684 event_bind_failed (make_unconnected_bind_endpoint_pair (address),
685 zmq_errno ());
686 return -1;
687 }
688
689 // Save last endpoint URI
690 listener->get_local_address (_last_endpoint);
691
692 add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
693 static_cast<own_t *> (listener), NULL);
694 options.connected = true;
695 return 0;
696 }
697 #endif
698
699 #if defined ZMQ_HAVE_IPC
700 if (protocol == protocol_name::ipc) {
701 ipc_listener_t *listener =
702 new (std::nothrow) ipc_listener_t (io_thread, this, options);
703 alloc_assert (listener);
704 int rc = listener->set_local_address (address.c_str ());
705 if (rc != 0) {
706 LIBZMQ_DELETE (listener);
707 event_bind_failed (make_unconnected_bind_endpoint_pair (address),
708 zmq_errno ());
709 return -1;
710 }
711
712 // Save last endpoint URI
713 listener->get_local_address (_last_endpoint);
714
715 add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
716 static_cast<own_t *> (listener), NULL);
717 options.connected = true;
718 return 0;
719 }
720 #endif
721 #if defined ZMQ_HAVE_TIPC
722 if (protocol == protocol_name::tipc) {
723 tipc_listener_t *listener =
724 new (std::nothrow) tipc_listener_t (io_thread, this, options);
725 alloc_assert (listener);
726 int rc = listener->set_local_address (address.c_str ());
727 if (rc != 0) {
728 LIBZMQ_DELETE (listener);
729 event_bind_failed (make_unconnected_bind_endpoint_pair (address),
730 zmq_errno ());
731 return -1;
732 }
733
734 // Save last endpoint URI
735 listener->get_local_address (_last_endpoint);
736
737 // TODO shouldn't this use _last_endpoint as in the other cases?
738 add_endpoint (make_unconnected_bind_endpoint_pair (endpoint_uri_),
739 static_cast<own_t *> (listener), NULL);
740 options.connected = true;
741 return 0;
742 }
743 #endif
744 #if defined ZMQ_HAVE_VMCI
745 if (protocol == protocol_name::vmci) {
746 vmci_listener_t *listener =
747 new (std::nothrow) vmci_listener_t (io_thread, this, options);
748 alloc_assert (listener);
749 int rc = listener->set_local_address (address.c_str ());
750 if (rc != 0) {
751 LIBZMQ_DELETE (listener);
752 event_bind_failed (make_unconnected_bind_endpoint_pair (address),
753 zmq_errno ());
754 return -1;
755 }
756
757 listener->get_local_address (_last_endpoint);
758
759 add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
760 static_cast<own_t *> (listener), NULL);
761 options.connected = true;
762 return 0;
763 }
764 #endif
765
766 zmq_assert (false);
767 return -1;
768 }
769
770 int zmq::socket_base_t::connect (const char *endpoint_uri_)
771 {
772 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
773 return connect_internal (endpoint_uri_);
774 }
775
776 int zmq::socket_base_t::connect_internal (const char *endpoint_uri_)
777 {
778 if (unlikely (_ctx_terminated)) {
779 errno = ETERM;
780 return -1;
781 }
782
783 // Process pending commands, if any.
784 int rc = process_commands (0, false);
785 if (unlikely (rc != 0)) {
786 return -1;
787 }
788
789 // Parse endpoint_uri_ string.
790 std::string protocol;
791 std::string address;
792 if (parse_uri (endpoint_uri_, protocol, address)
793 || check_protocol (protocol)) {
794 return -1;
795 }
796
797 if (protocol == protocol_name::inproc) {
798 // TODO: inproc connect is specific with respect to creating pipes
799 // as there's no 'reconnect' functionality implemented. Once that
800 // is in place we should follow generic pipe creation algorithm.
801
802 // Find the peer endpoint.
803 const endpoint_t peer = find_endpoint (endpoint_uri_);
804
805 // The total HWM for an inproc connection should be the sum of
806 // the binder's HWM and the connector's HWM.
807 const int sndhwm = peer.socket == NULL
808 ? options.sndhwm
809 : options.sndhwm != 0 && peer.options.rcvhwm != 0
810 ? options.sndhwm + peer.options.rcvhwm
811 : 0;
812 const int rcvhwm = peer.socket == NULL
813 ? options.rcvhwm
814 : options.rcvhwm != 0 && peer.options.sndhwm != 0
815 ? options.rcvhwm + peer.options.sndhwm
816 : 0;
817
818 // Create a bi-directional pipe to connect the peers.
819 object_t *parents[2] = {this, peer.socket == NULL ? this : peer.socket};
820 pipe_t *new_pipes[2] = {NULL, NULL};
821
822 const bool conflate = get_effective_conflate_option (options);
823
824 int hwms[2] = {conflate ? -1 : sndhwm, conflate ? -1 : rcvhwm};
825 bool conflates[2] = {conflate, conflate};
826 rc = pipepair (parents, new_pipes, hwms, conflates);
827 if (!conflate) {
828 new_pipes[0]->set_hwms_boost (peer.options.sndhwm,
829 peer.options.rcvhwm);
830 new_pipes[1]->set_hwms_boost (options.sndhwm, options.rcvhwm);
831 }
832
833 errno_assert (rc == 0);
834
835 if (!peer.socket) {
836 // The peer doesn't exist yet so we don't know whether
837 // to send the routing id message or not. To resolve this,
838 // we always send our routing id and drop it later if
839 // the peer doesn't expect it.
840 send_routing_id (new_pipes[0], options);
841
842 #ifdef ZMQ_BUILD_DRAFT_API
843 // If set, send the hello msg of the local socket to the peer.
844 if (options.can_send_hello_msg && options.hello_msg.size () > 0) {
845 send_hello_msg (new_pipes[0], options);
846 }
847 #endif
848
849 const endpoint_t endpoint = {this, options};
850 pend_connection (std::string (endpoint_uri_), endpoint, new_pipes);
851 } else {
852 // If required, send the routing id of the local socket to the peer.
853 if (peer.options.recv_routing_id) {
854 send_routing_id (new_pipes[0], options);
855 }
856
857 // If required, send the routing id of the peer to the local socket.
858 if (options.recv_routing_id) {
859 send_routing_id (new_pipes[1], peer.options);
860 }
861
862 #ifdef ZMQ_BUILD_DRAFT_API
863 // If set, send the hello msg of the local socket to the peer.
864 if (options.can_send_hello_msg && options.hello_msg.size () > 0) {
865 send_hello_msg (new_pipes[0], options);
866 }
867
868 // If set, send the hello msg of the peer to the local socket.
869 if (peer.options.can_send_hello_msg
870 && peer.options.hello_msg.size () > 0) {
871 send_hello_msg (new_pipes[1], peer.options);
872 }
873
874 if (peer.options.can_recv_disconnect_msg
875 && peer.options.disconnect_msg.size () > 0)
876 new_pipes[0]->set_disconnect_msg (peer.options.disconnect_msg);
877 #endif
878
879 // Attach remote end of the pipe to the peer socket. Note that peer's
880 // seqnum was incremented in find_endpoint function. We don't need it
881 // increased here.
882 send_bind (peer.socket, new_pipes[1], false);
883 }
884
885 // Attach local end of the pipe to this socket object.
886 attach_pipe (new_pipes[0], false, true);
887
888 // Save last endpoint URI
889 _last_endpoint.assign (endpoint_uri_);
890
891 // remember inproc connections for disconnect
892 _inprocs.emplace (endpoint_uri_, new_pipes[0]);
893
894 options.connected = true;
895 return 0;
896 }
897 const bool is_single_connect =
898 (options.type == ZMQ_DEALER || options.type == ZMQ_SUB
899 || options.type == ZMQ_PUB || options.type == ZMQ_REQ);
900 if (unlikely (is_single_connect)) {
901 if (0 != _endpoints.count (endpoint_uri_)) {
902 // There is no valid use for multiple connects for SUB-PUB nor
903 // DEALER-ROUTER nor REQ-REP. Multiple connects produces
904 // nonsensical results.
905 return 0;
906 }
907 }
908
909 // Choose the I/O thread to run the session in.
910 io_thread_t *io_thread = choose_io_thread (options.affinity);
911 if (!io_thread) {
912 errno = EMTHREAD;
913 return -1;
914 }
915
916 address_t *paddr =
917 new (std::nothrow) address_t (protocol, address, this->get_ctx ());
918 alloc_assert (paddr);
919
920 // Resolve address (if needed by the protocol)
921 if (protocol == protocol_name::tcp) {
922 // Do some basic sanity checks on tcp:// address syntax
923 // - hostname starts with digit or letter, with embedded '-' or '.'
924 // - IPv6 address may contain hex chars and colons.
925 // - IPv6 link local address may contain % followed by interface name / zone_id
926 // (Reference: https://tools.ietf.org/html/rfc4007)
927 // - IPv4 address may contain decimal digits and dots.
928 // - Address must end in ":port" where port is *, or numeric
929 // - Address may contain two parts separated by ':'
930 // Following code is quick and dirty check to catch obvious errors,
931 // without trying to be fully accurate.
932 const char *check = address.c_str ();
933 if (isalnum (*check) || isxdigit (*check) || *check == '['
934 || *check == ':') {
935 check++;
936 while (isalnum (*check) || isxdigit (*check) || *check == '.'
937 || *check == '-' || *check == ':' || *check == '%'
938 || *check == ';' || *check == '[' || *check == ']'
939 || *check == '_' || *check == '*') {
940 check++;
941 }
942 }
943 // Assume the worst, now look for success
944 rc = -1;
945 // Did we reach the end of the address safely?
946 if (*check == 0) {
947 // Do we have a valid port string? (cannot be '*' in connect
948 check = strrchr (address.c_str (), ':');
949 if (check) {
950 check++;
951 if (*check && (isdigit (*check)))
952 rc = 0; // Valid
953 }
954 }
955 if (rc == -1) {
956 errno = EINVAL;
957 LIBZMQ_DELETE (paddr);
958 return -1;
959 }
960 // Defer resolution until a socket is opened
961 paddr->resolved.tcp_addr = NULL;
962 }
963 #ifdef ZMQ_HAVE_WS
964 #ifdef ZMQ_HAVE_WSS
965 else if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
966 if (protocol == protocol_name::wss) {
967 paddr->resolved.wss_addr = new (std::nothrow) wss_address_t ();
968 alloc_assert (paddr->resolved.wss_addr);
969 rc = paddr->resolved.wss_addr->resolve (address.c_str (), false,
970 options.ipv6);
971 } else
972 #else
973 else if (protocol == protocol_name::ws) {
974 #endif
975 {
976 paddr->resolved.ws_addr = new (std::nothrow) ws_address_t ();
977 alloc_assert (paddr->resolved.ws_addr);
978 rc = paddr->resolved.ws_addr->resolve (address.c_str (), false,
979 options.ipv6);
980 }
981
982 if (rc != 0) {
983 LIBZMQ_DELETE (paddr);
984 return -1;
985 }
986 }
987 #endif
988
989 #if defined ZMQ_HAVE_IPC
990 else if (protocol == protocol_name::ipc) {
991 paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
992 alloc_assert (paddr->resolved.ipc_addr);
993 int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
994 if (rc != 0) {
995 LIBZMQ_DELETE (paddr);
996 return -1;
997 }
998 }
999 #endif
1000
1001 if (protocol == protocol_name::udp) {
1002 if (options.type != ZMQ_RADIO) {
1003 errno = ENOCOMPATPROTO;
1004 LIBZMQ_DELETE (paddr);
1005 return -1;
1006 }
1007
1008 paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
1009 alloc_assert (paddr->resolved.udp_addr);
1010 rc = paddr->resolved.udp_addr->resolve (address.c_str (), false,
1011 options.ipv6);
1012 if (rc != 0) {
1013 LIBZMQ_DELETE (paddr);
1014 return -1;
1015 }
1016 }
1017
1018 // TBD - Should we check address for ZMQ_HAVE_NORM???
1019
1020 #ifdef ZMQ_HAVE_OPENPGM
1021 if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
1022 struct pgm_addrinfo_t *res = NULL;
1023 uint16_t port_number = 0;
1024 int rc =
1025 pgm_socket_t::init_address (address.c_str (), &res, &port_number);
1026 if (res != NULL)
1027 pgm_freeaddrinfo (res);
1028 if (rc != 0 || port_number == 0) {
1029 return -1;
1030 }
1031 }
1032 #endif
1033 #if defined ZMQ_HAVE_TIPC
1034 else if (protocol == protocol_name::tipc) {
1035 paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
1036 alloc_assert (paddr->resolved.tipc_addr);
1037 int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
1038 if (rc != 0) {
1039 LIBZMQ_DELETE (paddr);
1040 return -1;
1041 }
1042 const sockaddr_tipc *const saddr =
1043 reinterpret_cast<const sockaddr_tipc *> (
1044 paddr->resolved.tipc_addr->addr ());
1045 // Cannot connect to random Port Identity
1046 if (saddr->addrtype == TIPC_ADDR_ID
1047 && paddr->resolved.tipc_addr->is_random ()) {
1048 LIBZMQ_DELETE (paddr);
1049 errno = EINVAL;
1050 return -1;
1051 }
1052 }
1053 #endif
1054 #if defined ZMQ_HAVE_VMCI
1055 else if (protocol == protocol_name::vmci) {
1056 paddr->resolved.vmci_addr =
1057 new (std::nothrow) vmci_address_t (this->get_ctx ());
1058 alloc_assert (paddr->resolved.vmci_addr);
1059 int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
1060 if (rc != 0) {
1061 LIBZMQ_DELETE (paddr);
1062 return -1;
1063 }
1064 }
1065 #endif
1066
1067 // Create session.
1068 session_base_t *session =
1069 session_base_t::create (io_thread, true, this, options, paddr);
1070 errno_assert (session);
1071
1072 // PGM does not support subscription forwarding; ask for all data to be
1073 // sent to this pipe. (same for NORM, currently?)
1074 #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
1075 const bool subscribe_to_all =
1076 protocol == protocol_name::pgm || protocol == protocol_name::epgm
1077 || protocol == protocol_name::norm || protocol == protocol_name::udp;
1078 #elif defined ZMQ_HAVE_OPENPGM
1079 const bool subscribe_to_all = protocol == protocol_name::pgm
1080 || protocol == protocol_name::epgm
1081 || protocol == protocol_name::udp;
1082 #elif defined ZMQ_HAVE_NORM
1083 const bool subscribe_to_all =
1084 protocol == protocol_name::norm || protocol == protocol_name::udp;
1085 #else
1086 const bool subscribe_to_all = protocol == protocol_name::udp;
1087 #endif
1088 pipe_t *newpipe = NULL;
1089
1090 if (options.immediate != 1 || subscribe_to_all) {
1091 // Create a bi-directional pipe.
1092 object_t *parents[2] = {this, session};
1093 pipe_t *new_pipes[2] = {NULL, NULL};
1094
1095 const bool conflate = get_effective_conflate_option (options);
1096
1097 int hwms[2] = {conflate ? -1 : options.sndhwm,
1098 conflate ? -1 : options.rcvhwm};
1099 bool conflates[2] = {conflate, conflate};
1100 rc = pipepair (parents, new_pipes, hwms, conflates);
1101 errno_assert (rc == 0);
1102
1103 // Attach local end of the pipe to the socket object.
1104 attach_pipe (new_pipes[0], subscribe_to_all, true);
1105 newpipe = new_pipes[0];
1106
1107 // Attach remote end of the pipe to the session object later on.
1108 session->attach_pipe (new_pipes[1]);
1109 }
1110
1111 // Save last endpoint URI
1112 paddr->to_string (_last_endpoint);
1113
1114 add_endpoint (make_unconnected_connect_endpoint_pair (endpoint_uri_),
1115 static_cast<own_t *> (session), newpipe);
1116 return 0;
1117 }
1118
1119 std::string
1120 zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_uri_pair_,
1121 const char *tcp_address_)
1122 {
1123 // The resolved last_endpoint is used as a key in the endpoints map.
1124 // The address passed by the user might not match in the TCP case due to
1125 // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
1126 // resolve before giving up. Given at this stage we don't know whether a
1127 // socket is connected or bound, try with both.
1128 if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
1129 tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
1130 alloc_assert (tcp_addr);
1131 int rc = tcp_addr->resolve (tcp_address_, false, options.ipv6);
1132
1133 if (rc == 0) {
1134 tcp_addr->to_string (endpoint_uri_pair_);
1135 if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
1136 rc = tcp_addr->resolve (tcp_address_, true, options.ipv6);
1137 if (rc == 0) {
1138 tcp_addr->to_string (endpoint_uri_pair_);
1139 }
1140 }
1141 }
1142 LIBZMQ_DELETE (tcp_addr);
1143 }
1144 return endpoint_uri_pair_;
1145 }
1146
1147 void zmq::socket_base_t::add_endpoint (
1148 const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_)
1149 {
1150 // Activate the session. Make it a child of this socket.
1151 launch_child (endpoint_);
1152 _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.identifier (),
1153 endpoint_pipe_t (endpoint_, pipe_));
1154
1155 if (pipe_ != NULL)
1156 pipe_->set_endpoint_pair (endpoint_pair_);
1157 }
1158
1159 int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_)
1160 {
1161 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1162
1163 // Check whether the context hasn't been shut down yet.
1164 if (unlikely (_ctx_terminated)) {
1165 errno = ETERM;
1166 return -1;
1167 }
1168
1169 // Check whether endpoint address passed to the function is valid.
1170 if (unlikely (!endpoint_uri_)) {
1171 errno = EINVAL;
1172 return -1;
1173 }
1174
1175 // Process pending commands, if any, since there could be pending unprocessed process_own()'s
1176 // (from launch_child() for example) we're asked to terminate now.
1177 const int rc = process_commands (0, false);
1178 if (unlikely (rc != 0)) {
1179 return -1;
1180 }
1181
1182 // Parse endpoint_uri_ string.
1183 std::string uri_protocol;
1184 std::string uri_path;
1185 if (parse_uri (endpoint_uri_, uri_protocol, uri_path)
1186 || check_protocol (uri_protocol)) {
1187 return -1;
1188 }
1189
1190 const std::string endpoint_uri_str = std::string (endpoint_uri_);
1191
1192 // Disconnect an inproc socket
1193 if (uri_protocol == protocol_name::inproc) {
1194 return unregister_endpoint (endpoint_uri_str, this) == 0
1195 ? 0
1196 : _inprocs.erase_pipes (endpoint_uri_str);
1197 }
1198
1199 const std::string resolved_endpoint_uri =
1200 uri_protocol == protocol_name::tcp
1201 ? resolve_tcp_addr (endpoint_uri_str, uri_path.c_str ())
1202 : endpoint_uri_str;
1203
1204 // Find the endpoints range (if any) corresponding to the endpoint_uri_pair_ string.
1205 const std::pair<endpoints_t::iterator, endpoints_t::iterator> range =
1206 _endpoints.equal_range (resolved_endpoint_uri);
1207 if (range.first == range.second) {
1208 errno = ENOENT;
1209 return -1;
1210 }
1211
1212 for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1213 // If we have an associated pipe, terminate it.
1214 if (it->second.second != NULL)
1215 it->second.second->terminate (false);
1216 term_child (it->second.first);
1217 }
1218 _endpoints.erase (range.first, range.second);
1219
1220 if (options.reconnect_stop & ZMQ_RECONNECT_STOP_AFTER_DISCONNECT) {
1221 _disconnected = true;
1222 }
1223
1224 return 0;
1225 }
1226
1227 int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1228 {
1229 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1230
1231 // Check whether the context hasn't been shut down yet.
1232 if (unlikely (_ctx_terminated)) {
1233 errno = ETERM;
1234 return -1;
1235 }
1236
1237 // Check whether message passed to the function is valid.
1238 if (unlikely (!msg_ || !msg_->check ())) {
1239 errno = EFAULT;
1240 return -1;
1241 }
1242
1243 // Process pending commands, if any.
1244 int rc = process_commands (0, true);
1245 if (unlikely (rc != 0)) {
1246 return -1;
1247 }
1248
1249 // Clear any user-visible flags that are set on the message.
1250 msg_->reset_flags (msg_t::more);
1251
1252 // At this point we impose the flags on the message.
1253 if (flags_ & ZMQ_SNDMORE)
1254 msg_->set_flags (msg_t::more);
1255
1256 msg_->reset_metadata ();
1257
1258 // Try to send the message using method in each socket class
1259 rc = xsend (msg_);
1260 if (rc == 0) {
1261 return 0;
1262 }
1263 // Special case for ZMQ_PUSH: -2 means pipe is dead while a
1264 // multi-part send is in progress and can't be recovered, so drop
1265 // silently when in blocking mode to keep backward compatibility.
1266 if (unlikely (rc == -2)) {
1267 if (!((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0)) {
1268 rc = msg_->close ();
1269 errno_assert (rc == 0);
1270 rc = msg_->init ();
1271 errno_assert (rc == 0);
1272 return 0;
1273 }
1274 }
1275 if (unlikely (errno != EAGAIN)) {
1276 return -1;
1277 }
1278
1279 // In case of non-blocking send we'll simply propagate
1280 // the error - including EAGAIN - up the stack.
1281 if ((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0) {
1282 return -1;
1283 }
1284
1285 // Compute the time when the timeout should occur.
1286 // If the timeout is infinite, don't care.
1287 int timeout = options.sndtimeo;
1288 const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
1289
1290 // Oops, we couldn't send the message. Wait for the next
1291 // command, process it and try to send the message again.
1292 // If timeout is reached in the meantime, return EAGAIN.
1293 while (true) {
1294 if (unlikely (process_commands (timeout, false) != 0)) {
1295 return -1;
1296 }
1297 rc = xsend (msg_);
1298 if (rc == 0)
1299 break;
1300 if (unlikely (errno != EAGAIN)) {
1301 return -1;
1302 }
1303 if (timeout > 0) {
1304 timeout = static_cast<int> (end - _clock.now_ms ());
1305 if (timeout <= 0) {
1306 errno = EAGAIN;
1307 return -1;
1308 }
1309 }
1310 }
1311
1312 return 0;
1313 }
1314
1315 int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1316 {
1317 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1318
1319 // Check whether the context hasn't been shut down yet.
1320 if (unlikely (_ctx_terminated)) {
1321 errno = ETERM;
1322 return -1;
1323 }
1324
1325 // Check whether message passed to the function is valid.
1326 if (unlikely (!msg_ || !msg_->check ())) {
1327 errno = EFAULT;
1328 return -1;
1329 }
1330
1331 // Once every inbound_poll_rate messages check for signals and process
1332 // incoming commands. This happens only if we are not polling altogether
1333 // because there are messages available all the time. If poll occurs,
1334 // ticks is set to zero and thus we avoid this code.
1335 //
1336 // Note that 'recv' uses different command throttling algorithm (the one
1337 // described above) from the one used by 'send'. This is because counting
1338 // ticks is more efficient than doing RDTSC all the time.
1339 if (++_ticks == inbound_poll_rate) {
1340 if (unlikely (process_commands (0, false) != 0)) {
1341 return -1;
1342 }
1343 _ticks = 0;
1344 }
1345
1346 // Get the message.
1347 int rc = xrecv (msg_);
1348 if (unlikely (rc != 0 && errno != EAGAIN)) {
1349 return -1;
1350 }
1351
1352 // If we have the message, return immediately.
1353 if (rc == 0) {
1354 extract_flags (msg_);
1355 return 0;
1356 }
1357
1358 // If the message cannot be fetched immediately, there are two scenarios.
1359 // For non-blocking recv, commands are processed in case there's an
1360 // activate_reader command already waiting in a command pipe.
1361 // If it's not, return EAGAIN.
1362 if ((flags_ & ZMQ_DONTWAIT) || options.rcvtimeo == 0) {
1363 if (unlikely (process_commands (0, false) != 0)) {
1364 return -1;
1365 }
1366 _ticks = 0;
1367
1368 rc = xrecv (msg_);
1369 if (rc < 0) {
1370 return rc;
1371 }
1372 extract_flags (msg_);
1373
1374 return 0;
1375 }
1376
1377 // Compute the time when the timeout should occur.
1378 // If the timeout is infinite, don't care.
1379 int timeout = options.rcvtimeo;
1380 const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
1381
1382 // In blocking scenario, commands are processed over and over again until
1383 // we are able to fetch a message.
1384 bool block = (_ticks != 0);
1385 while (true) {
1386 if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1387 return -1;
1388 }
1389 rc = xrecv (msg_);
1390 if (rc == 0) {
1391 _ticks = 0;
1392 break;
1393 }
1394 if (unlikely (errno != EAGAIN)) {
1395 return -1;
1396 }
1397 block = true;
1398 if (timeout > 0) {
1399 timeout = static_cast<int> (end - _clock.now_ms ());
1400 if (timeout <= 0) {
1401 errno = EAGAIN;
1402 return -1;
1403 }
1404 }
1405 }
1406
1407 extract_flags (msg_);
1408 return 0;
1409 }
1410
1411 int zmq::socket_base_t::close ()
1412 {
1413 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1414
1415 // Remove all existing signalers for thread safe sockets
1416 if (_thread_safe)
1417 (static_cast<mailbox_safe_t *> (_mailbox))->clear_signalers ();
1418
1419 // Mark the socket as dead
1420 _tag = 0xdeadbeef;
1421
1422
1423 // Transfer the ownership of the socket from this application thread
1424 // to the reaper thread which will take care of the rest of shutdown
1425 // process.
1426 send_reap (this);
1427
1428 return 0;
1429 }
1430
1431 bool zmq::socket_base_t::has_in ()
1432 {
1433 return xhas_in ();
1434 }
1435
1436 bool zmq::socket_base_t::has_out ()
1437 {
1438 return xhas_out ();
1439 }
1440
1441 void zmq::socket_base_t::start_reaping (poller_t *poller_)
1442 {
1443 // Plug the socket to the reaper thread.
1444 _poller = poller_;
1445
1446 fd_t fd;
1447
1448 if (!_thread_safe)
1449 fd = (static_cast<mailbox_t *> (_mailbox))->get_fd ();
1450 else {
1451 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1452
1453 _reaper_signaler = new (std::nothrow) signaler_t ();
1454 zmq_assert (_reaper_signaler);
1455
1456 // Add signaler to the safe mailbox
1457 fd = _reaper_signaler->get_fd ();
1458 (static_cast<mailbox_safe_t *> (_mailbox))
1459 ->add_signaler (_reaper_signaler);
1460
1461 // Send a signal to make sure reaper handle existing commands
1462 _reaper_signaler->send ();
1463 }
1464
1465 _handle = _poller->add_fd (fd, this);
1466 _poller->set_pollin (_handle);
1467
1468 // Initialise the termination and check whether it can be deallocated
1469 // immediately.
1470 terminate ();
1471 check_destroy ();
1472 }
1473
1474 int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
1475 {
1476 if (timeout_ == 0) {
1477 // If we are asked not to wait, check whether we haven't processed
1478 // commands recently, so that we can throttle the new commands.
1479
1480 // Get the CPU's tick counter. If 0, the counter is not available.
1481 const uint64_t tsc = zmq::clock_t::rdtsc ();
1482
1483 // Optimised version of command processing - it doesn't have to check
1484 // for incoming commands each time. It does so only if certain time
1485 // elapsed since last command processing. Command delay varies
1486 // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
1487 // etc. The optimisation makes sense only on platforms where getting
1488 // a timestamp is a very cheap operation (tens of nanoseconds).
1489 if (tsc && throttle_) {
1490 // Check whether TSC haven't jumped backwards (in case of migration
1491 // between CPU cores) and whether certain time have elapsed since
1492 // last command processing. If it didn't do nothing.
1493 if (tsc >= _last_tsc && tsc - _last_tsc <= max_command_delay)
1494 return 0;
1495 _last_tsc = tsc;
1496 }
1497 }
1498
1499 // Check whether there are any commands pending for this thread.
1500 command_t cmd;
1501 int rc = _mailbox->recv (&cmd, timeout_);
1502
1503 // Process all available commands.
1504 while (rc == 0) {
1505 cmd.destination->process_command (cmd);
1506 rc = _mailbox->recv (&cmd, 0);
1507 }
1508
1509 if (errno == EINTR)
1510 return -1;
1511
1512 zmq_assert (errno == EAGAIN);
1513
1514 if (_ctx_terminated) {
1515 errno = ETERM;
1516 return -1;
1517 }
1518
1519 return 0;
1520 }
1521
1522 void zmq::socket_base_t::process_stop ()
1523 {
1524 // Here, someone have called zmq_ctx_term while the socket was still alive.
1525 // We'll remember the fact so that any blocking call is interrupted and any
1526 // further attempt to use the socket will return ETERM. The user is still
1527 // responsible for calling zmq_close on the socket though!
1528 scoped_lock_t lock (_monitor_sync);
1529 stop_monitor ();
1530
1531 _ctx_terminated = true;
1532 }
1533
1534 void zmq::socket_base_t::process_bind (pipe_t *pipe_)
1535 {
1536 attach_pipe (pipe_);
1537 }
1538
1539 void zmq::socket_base_t::process_term (int linger_)
1540 {
1541 // Unregister all inproc endpoints associated with this socket.
1542 // Doing this we make sure that no new pipes from other sockets (inproc)
1543 // will be initiated.
1544 unregister_endpoints (this);
1545
1546 // Ask all attached pipes to terminate.
1547 for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
1548 // Only inprocs might have a disconnect message set
1549 _pipes[i]->send_disconnect_msg ();
1550 _pipes[i]->terminate (false);
1551 }
1552 register_term_acks (static_cast<int> (_pipes.size ()));
1553
1554 // Continue the termination process immediately.
1555 own_t::process_term (linger_);
1556 }
1557
1558 void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
1559 {
1560 term_endpoint (endpoint_->c_str ());
1561 delete endpoint_;
1562 }
1563
1564 void zmq::socket_base_t::process_pipe_stats_publish (
1565 uint64_t outbound_queue_count_,
1566 uint64_t inbound_queue_count_,
1567 endpoint_uri_pair_t *endpoint_pair_)
1568 {
1569 uint64_t values[2] = {outbound_queue_count_, inbound_queue_count_};
1570 event (*endpoint_pair_, values, 2, ZMQ_EVENT_PIPES_STATS);
1571 delete endpoint_pair_;
1572 }
1573
1574 /*
1575 * There are 2 pipes per connection, and the inbound one _must_ be queried from
1576 * the I/O thread. So ask the outbound pipe, in the application thread, to send
1577 * a message (pipe_peer_stats) to its peer. The message will carry the outbound
1578 * pipe stats and endpoint, and the reference to the socket object.
1579 * The inbound pipe on the I/O thread will then add its own stats and endpoint,
1580 * and write back a message to the socket object (pipe_stats_publish) which
1581 * will raise an event with the data.
1582 */
1583 int zmq::socket_base_t::query_pipes_stats ()
1584 {
1585 {
1586 scoped_lock_t lock (_monitor_sync);
1587 if (!(_monitor_events & ZMQ_EVENT_PIPES_STATS)) {
1588 errno = EINVAL;
1589 return -1;
1590 }
1591 }
1592 if (_pipes.size () == 0) {
1593 errno = EAGAIN;
1594 return -1;
1595 }
1596 for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
1597 _pipes[i]->send_stats_to_peer (this);
1598 }
1599
1600 return 0;
1601 }
1602
1603 void zmq::socket_base_t::update_pipe_options (int option_)
1604 {
1605 if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
1606 for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
1607 _pipes[i]->set_hwms (options.rcvhwm, options.sndhwm);
1608 _pipes[i]->send_hwms_to_peer (options.sndhwm, options.rcvhwm);
1609 }
1610 }
1611 }
1612
1613 void zmq::socket_base_t::process_destroy ()
1614 {
1615 _destroyed = true;
1616 }
1617
1618 int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1619 {
1620 errno = EINVAL;
1621 return -1;
1622 }
1623
1624 bool zmq::socket_base_t::xhas_out ()
1625 {
1626 return false;
1627 }
1628
1629 int zmq::socket_base_t::xsend (msg_t *)
1630 {
1631 errno = ENOTSUP;
1632 return -1;
1633 }
1634
1635 bool zmq::socket_base_t::xhas_in ()
1636 {
1637 return false;
1638 }
1639
1640 int zmq::socket_base_t::xjoin (const char *group_)
1641 {
1642 LIBZMQ_UNUSED (group_);
1643 errno = ENOTSUP;
1644 return -1;
1645 }
1646
1647 int zmq::socket_base_t::xleave (const char *group_)
1648 {
1649 LIBZMQ_UNUSED (group_);
1650 errno = ENOTSUP;
1651 return -1;
1652 }
1653
1654 int zmq::socket_base_t::xrecv (msg_t *)
1655 {
1656 errno = ENOTSUP;
1657 return -1;
1658 }
1659
1660 void zmq::socket_base_t::xread_activated (pipe_t *)
1661 {
1662 zmq_assert (false);
1663 }
1664 void zmq::socket_base_t::xwrite_activated (pipe_t *)
1665 {
1666 zmq_assert (false);
1667 }
1668
1669 void zmq::socket_base_t::xhiccuped (pipe_t *)
1670 {
1671 zmq_assert (false);
1672 }
1673
1674 void zmq::socket_base_t::in_event ()
1675 {
1676 // This function is invoked only once the socket is running in the context
1677 // of the reaper thread. Process any commands from other threads/sockets
1678 // that may be available at the moment. Ultimately, the socket will
1679 // be destroyed.
1680 {
1681 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1682
1683 // If the socket is thread safe we need to unsignal the reaper signaler
1684 if (_thread_safe)
1685 _reaper_signaler->recv ();
1686
1687 process_commands (0, false);
1688 }
1689 check_destroy ();
1690 }
1691
1692 void zmq::socket_base_t::out_event ()
1693 {
1694 zmq_assert (false);
1695 }
1696
1697 void zmq::socket_base_t::timer_event (int)
1698 {
1699 zmq_assert (false);
1700 }
1701
1702 void zmq::socket_base_t::check_destroy ()
1703 {
1704 // If the object was already marked as destroyed, finish the deallocation.
1705 if (_destroyed) {
1706 // Remove the socket from the reaper's poller.
1707 _poller->rm_fd (_handle);
1708
1709 // Remove the socket from the context.
1710 destroy_socket (this);
1711
1712 // Notify the reaper about the fact.
1713 send_reaped ();
1714
1715 // Deallocate.
1716 own_t::process_destroy ();
1717 }
1718 }
1719
1720 void zmq::socket_base_t::read_activated (pipe_t *pipe_)
1721 {
1722 xread_activated (pipe_);
1723 }
1724
1725 void zmq::socket_base_t::write_activated (pipe_t *pipe_)
1726 {
1727 xwrite_activated (pipe_);
1728 }
1729
1730 void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
1731 {
1732 if (options.immediate == 1)
1733 pipe_->terminate (false);
1734 else
1735 // Notify derived sockets of the hiccup
1736 xhiccuped (pipe_);
1737 }
1738
1739 void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1740 {
1741 // Notify the specific socket type about the pipe termination.
1742 xpipe_terminated (pipe_);
1743
1744 // Remove pipe from inproc pipes
1745 _inprocs.erase_pipe (pipe_);
1746
1747 // Remove the pipe from the list of attached pipes and confirm its
1748 // termination if we are already shutting down.
1749 _pipes.erase (pipe_);
1750
1751 // Remove the pipe from _endpoints (set it to NULL).
1752 const std::string &identifier = pipe_->get_endpoint_pair ().identifier ();
1753 if (!identifier.empty ()) {
1754 std::pair<endpoints_t::iterator, endpoints_t::iterator> range;
1755 range = _endpoints.equal_range (identifier);
1756
1757 for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1758 if (it->second.second == pipe_) {
1759 it->second.second = NULL;
1760 break;
1761 }
1762 }
1763 }
1764
1765 if (is_terminating ())
1766 unregister_term_ack ();
1767 }
1768
1769 void zmq::socket_base_t::extract_flags (const msg_t *msg_)
1770 {
1771 // Test whether routing_id flag is valid for this socket type.
1772 if (unlikely (msg_->flags () & msg_t::routing_id))
1773 zmq_assert (options.recv_routing_id);
1774
1775 // Remove MORE flag.
1776 _rcvmore = (msg_->flags () & msg_t::more) != 0;
1777 }
1778
1779 int zmq::socket_base_t::monitor (const char *endpoint_,
1780 uint64_t events_,
1781 int event_version_,
1782 int type_)
1783 {
1784 scoped_lock_t lock (_monitor_sync);
1785
1786 if (unlikely (_ctx_terminated)) {
1787 errno = ETERM;
1788 return -1;
1789 }
1790
1791 // Event version 1 supports only first 16 events.
1792 if (unlikely (event_version_ == 1 && events_ >> 16 != 0)) {
1793 errno = EINVAL;
1794 return -1;
1795 }
1796
1797 // Support deregistering monitoring endpoints as well
1798 if (endpoint_ == NULL) {
1799 stop_monitor ();
1800 return 0;
1801 }
1802 // Parse endpoint_uri_ string.
1803 std::string protocol;
1804 std::string address;
1805 if (parse_uri (endpoint_, protocol, address) || check_protocol (protocol))
1806 return -1;
1807
1808 // Event notification only supported over inproc://
1809 if (protocol != protocol_name::inproc) {
1810 errno = EPROTONOSUPPORT;
1811 return -1;
1812 }
1813
1814 // already monitoring. Stop previous monitor before starting new one.
1815 if (_monitor_socket != NULL) {
1816 stop_monitor (true);
1817 }
1818
1819 // Check if the specified socket type is supported. It must be a
1820 // one-way socket types that support the SNDMORE flag.
1821 switch (type_) {
1822 case ZMQ_PAIR:
1823 break;
1824 case ZMQ_PUB:
1825 break;
1826 case ZMQ_PUSH:
1827 break;
1828 default:
1829 errno = EINVAL;
1830 return -1;
1831 }
1832
1833 // Register events to monitor
1834 _monitor_events = events_;
1835 options.monitor_event_version = event_version_;
1836 // Create a monitor socket of the specified type.
1837 _monitor_socket = zmq_socket (get_ctx (), type_);
1838 if (_monitor_socket == NULL)
1839 return -1;
1840
1841 // Never block context termination on pending event messages
1842 int linger = 0;
1843 int rc =
1844 zmq_setsockopt (_monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1845 if (rc == -1)
1846 stop_monitor (false);
1847
1848 // Spawn the monitor socket endpoint
1849 rc = zmq_bind (_monitor_socket, endpoint_);
1850 if (rc == -1)
1851 stop_monitor (false);
1852 return rc;
1853 }
1854
1855 void zmq::socket_base_t::event_connected (
1856 const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1857 {
1858 uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1859 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECTED);
1860 }
1861
1862 void zmq::socket_base_t::event_connect_delayed (
1863 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1864 {
1865 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1866 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_DELAYED);
1867 }
1868
1869 void zmq::socket_base_t::event_connect_retried (
1870 const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_)
1871 {
1872 uint64_t values[1] = {static_cast<uint64_t> (interval_)};
1873 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_RETRIED);
1874 }
1875
1876 void zmq::socket_base_t::event_listening (
1877 const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1878 {
1879 uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1880 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_LISTENING);
1881 }
1882
1883 void zmq::socket_base_t::event_bind_failed (
1884 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1885 {
1886 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1887 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_BIND_FAILED);
1888 }
1889
1890 void zmq::socket_base_t::event_accepted (
1891 const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1892 {
1893 uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1894 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPTED);
1895 }
1896
1897 void zmq::socket_base_t::event_accept_failed (
1898 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1899 {
1900 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1901 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPT_FAILED);
1902 }
1903
1904 void zmq::socket_base_t::event_closed (
1905 const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1906 {
1907 uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1908 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSED);
1909 }
1910
1911 void zmq::socket_base_t::event_close_failed (
1912 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1913 {
1914 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1915 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSE_FAILED);
1916 }
1917
1918 void zmq::socket_base_t::event_disconnected (
1919 const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1920 {
1921 uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1922 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_DISCONNECTED);
1923 }
1924
1925 void zmq::socket_base_t::event_handshake_failed_no_detail (
1926 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1927 {
1928 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1929 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
1930 }
1931
1932 void zmq::socket_base_t::event_handshake_failed_protocol (
1933 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1934 {
1935 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1936 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
1937 }
1938
1939 void zmq::socket_base_t::event_handshake_failed_auth (
1940 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1941 {
1942 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1943 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
1944 }
1945
1946 void zmq::socket_base_t::event_handshake_succeeded (
1947 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1948 {
1949 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1950 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
1951 }
1952
1953 void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_,
1954 uint64_t values_[],
1955 uint64_t values_count_,
1956 uint64_t type_)
1957 {
1958 scoped_lock_t lock (_monitor_sync);
1959 if (_monitor_events & type_) {
1960 monitor_event (type_, values_, values_count_, endpoint_uri_pair_);
1961 }
1962 }
1963
1964 // Send a monitor event
1965 void zmq::socket_base_t::monitor_event (
1966 uint64_t event_,
1967 const uint64_t values_[],
1968 uint64_t values_count_,
1969 const endpoint_uri_pair_t &endpoint_uri_pair_) const
1970 {
1971 // this is a private method which is only called from
1972 // contexts where the _monitor_sync mutex has been locked before
1973
1974 if (_monitor_socket) {
1975 zmq_msg_t msg;
1976
1977 switch (options.monitor_event_version) {
1978 case 1: {
1979 // The API should not allow to activate unsupported events
1980 zmq_assert (event_ <= std::numeric_limits<uint16_t>::max ());
1981 // v1 only allows one value
1982 zmq_assert (values_count_ == 1);
1983 zmq_assert (values_[0]
1984 <= std::numeric_limits<uint32_t>::max ());
1985
1986 // Send event and value in first frame
1987 const uint16_t event = static_cast<uint16_t> (event_);
1988 const uint32_t value = static_cast<uint32_t> (values_[0]);
1989 zmq_msg_init_size (&msg, sizeof (event) + sizeof (value));
1990 uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
1991 // Avoid dereferencing uint32_t on unaligned address
1992 memcpy (data + 0, &event, sizeof (event));
1993 memcpy (data + sizeof (event), &value, sizeof (value));
1994 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
1995
1996 const std::string &endpoint_uri =
1997 endpoint_uri_pair_.identifier ();
1998
1999 // Send address in second frame
2000 zmq_msg_init_size (&msg, endpoint_uri.size ());
2001 memcpy (zmq_msg_data (&msg), endpoint_uri.c_str (),
2002 endpoint_uri.size ());
2003 zmq_msg_send (&msg, _monitor_socket, 0);
2004 } break;
2005 case 2: {
2006 // Send event in first frame (64bit unsigned)
2007 zmq_msg_init_size (&msg, sizeof (event_));
2008 memcpy (zmq_msg_data (&msg), &event_, sizeof (event_));
2009 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2010
2011 // Send number of values that will follow in second frame
2012 zmq_msg_init_size (&msg, sizeof (values_count_));
2013 memcpy (zmq_msg_data (&msg), &values_count_,
2014 sizeof (values_count_));
2015 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2016
2017 // Send values in third-Nth frames (64bit unsigned)
2018 for (uint64_t i = 0; i < values_count_; ++i) {
2019 zmq_msg_init_size (&msg, sizeof (values_[i]));
2020 memcpy (zmq_msg_data (&msg), &values_[i],
2021 sizeof (values_[i]));
2022 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2023 }
2024
2025 // Send local endpoint URI in second-to-last frame (string)
2026 zmq_msg_init_size (&msg, endpoint_uri_pair_.local.size ());
2027 memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.local.c_str (),
2028 endpoint_uri_pair_.local.size ());
2029 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2030
2031 // Send remote endpoint URI in last frame (string)
2032 zmq_msg_init_size (&msg, endpoint_uri_pair_.remote.size ());
2033 memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.remote.c_str (),
2034 endpoint_uri_pair_.remote.size ());
2035 zmq_msg_send (&msg, _monitor_socket, 0);
2036 } break;
2037 }
2038 }
2039 }
2040
2041 void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
2042 {
2043 // this is a private method which is only called from
2044 // contexts where the _monitor_sync mutex has been locked before
2045
2046 if (_monitor_socket) {
2047 if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
2048 && send_monitor_stopped_event_) {
2049 uint64_t values[1] = {0};
2050 monitor_event (ZMQ_EVENT_MONITOR_STOPPED, values, 1,
2051 endpoint_uri_pair_t ());
2052 }
2053 zmq_close (_monitor_socket);
2054 _monitor_socket = NULL;
2055 _monitor_events = 0;
2056 }
2057 }
2058
2059 bool zmq::socket_base_t::is_disconnected () const
2060 {
2061 return _disconnected;
2062 }
2063
2064 zmq::routing_socket_base_t::routing_socket_base_t (class ctx_t *parent_,
2065 uint32_t tid_,
2066 int sid_) :
2067 socket_base_t (parent_, tid_, sid_)
2068 {
2069 }
2070
2071 zmq::routing_socket_base_t::~routing_socket_base_t ()
2072 {
2073 zmq_assert (_out_pipes.empty ());
2074 }
2075
2076 int zmq::routing_socket_base_t::xsetsockopt (int option_,
2077 const void *optval_,
2078 size_t optvallen_)
2079 {
2080 switch (option_) {
2081 case ZMQ_CONNECT_ROUTING_ID:
2082 // TODO why isn't it possible to set an empty connect_routing_id
2083 // (which is the default value)
2084 if (optval_ && optvallen_) {
2085 _connect_routing_id.assign (static_cast<const char *> (optval_),
2086 optvallen_);
2087 return 0;
2088 }
2089 break;
2090 }
2091 errno = EINVAL;
2092 return -1;
2093 }
2094
2095 void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_)
2096 {
2097 const out_pipes_t::iterator end = _out_pipes.end ();
2098 out_pipes_t::iterator it;
2099 for (it = _out_pipes.begin (); it != end; ++it)
2100 if (it->second.pipe == pipe_)
2101 break;
2102
2103 zmq_assert (it != end);
2104 zmq_assert (!it->second.active);
2105 it->second.active = true;
2106 }
2107
2108 std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
2109 {
2110 std::string res = ZMQ_MOVE (_connect_routing_id);
2111 _connect_routing_id.clear ();
2112 return res;
2113 }
2114
2115 bool zmq::routing_socket_base_t::connect_routing_id_is_set () const
2116 {
2117 return !_connect_routing_id.empty ();
2118 }
2119
2120 void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id_,
2121 pipe_t *pipe_)
2122 {
2123 // Add the record into output pipes lookup table
2124 const out_pipe_t outpipe = {pipe_, true};
2125 const bool ok =
2126 _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id_), outpipe)
2127 .second;
2128 zmq_assert (ok);
2129 }
2130
2131 bool zmq::routing_socket_base_t::has_out_pipe (const blob_t &routing_id_) const
2132 {
2133 return 0 != _out_pipes.count (routing_id_);
2134 }
2135
2136 zmq::routing_socket_base_t::out_pipe_t *
2137 zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_)
2138 {
2139 // TODO we could probably avoid constructor a temporary blob_t to call this function
2140 out_pipes_t::iterator it = _out_pipes.find (routing_id_);
2141 return it == _out_pipes.end () ? NULL : &it->second;
2142 }
2143
2144 const zmq::routing_socket_base_t::out_pipe_t *
2145 zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_) const
2146 {
2147 // TODO we could probably avoid constructor a temporary blob_t to call this function
2148 const out_pipes_t::const_iterator it = _out_pipes.find (routing_id_);
2149 return it == _out_pipes.end () ? NULL : &it->second;
2150 }
2151
2152 void zmq::routing_socket_base_t::erase_out_pipe (const pipe_t *pipe_)
2153 {
2154 const size_t erased = _out_pipes.erase (pipe_->get_routing_id ());
2155 zmq_assert (erased);
2156 }
2157
2158 zmq::routing_socket_base_t::out_pipe_t
2159 zmq::routing_socket_base_t::try_erase_out_pipe (const blob_t &routing_id_)
2160 {
2161 const out_pipes_t::iterator it = _out_pipes.find (routing_id_);
2162 out_pipe_t res = {NULL, false};
2163 if (it != _out_pipes.end ()) {
2164 res = it->second;
2165 _out_pipes.erase (it);
2166 }
2167 return res;
2168 }
2169