1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <new>
32 #include <string>
33 #include <algorithm>
34 #include <limits>
35 
36 #include "macros.hpp"
37 
38 #if defined ZMQ_HAVE_WINDOWS
39 #if defined _MSC_VER
40 #if defined _WIN32_WCE
41 #include <cmnintrin.h>
42 #else
43 #include <intrin.h>
44 #endif
45 #endif
46 #else
47 #include <unistd.h>
48 #include <ctype.h>
49 #endif
50 
51 #include "socket_base.hpp"
52 #include "tcp_listener.hpp"
53 #include "ws_listener.hpp"
54 #include "ipc_listener.hpp"
55 #include "tipc_listener.hpp"
56 #include "tcp_connecter.hpp"
57 #ifdef ZMQ_HAVE_WS
58 #include "ws_address.hpp"
59 #endif
60 #include "io_thread.hpp"
61 #include "session_base.hpp"
62 #include "config.hpp"
63 #include "pipe.hpp"
64 #include "err.hpp"
65 #include "ctx.hpp"
66 #include "likely.hpp"
67 #include "msg.hpp"
68 #include "address.hpp"
69 #include "ipc_address.hpp"
70 #include "tcp_address.hpp"
71 #include "udp_address.hpp"
72 #include "tipc_address.hpp"
73 #include "mailbox.hpp"
74 #include "mailbox_safe.hpp"
75 
76 #ifdef ZMQ_HAVE_WSS
77 #include "wss_address.hpp"
78 #endif
79 #if defined ZMQ_HAVE_VMCI
80 #include "vmci_address.hpp"
81 #include "vmci_listener.hpp"
82 #endif
83 
84 #ifdef ZMQ_HAVE_OPENPGM
85 #include "pgm_socket.hpp"
86 #endif
87 
88 #include "pair.hpp"
89 #include "pub.hpp"
90 #include "sub.hpp"
91 #include "req.hpp"
92 #include "rep.hpp"
93 #include "pull.hpp"
94 #include "push.hpp"
95 #include "dealer.hpp"
96 #include "router.hpp"
97 #include "xpub.hpp"
98 #include "xsub.hpp"
99 #include "stream.hpp"
100 #include "server.hpp"
101 #include "client.hpp"
102 #include "radio.hpp"
103 #include "dish.hpp"
104 #include "gather.hpp"
105 #include "scatter.hpp"
106 #include "dgram.hpp"
107 #include "peer.hpp"
108 #include "channel.hpp"
109 
emplace(const char * endpoint_uri_,pipe_t * pipe_)110 void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_,
111                                              pipe_t *pipe_)
112 {
113     _inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_), pipe_);
114 }
115 
erase_pipes(const std::string & endpoint_uri_str_)116 int zmq::socket_base_t::inprocs_t::erase_pipes (
117   const std::string &endpoint_uri_str_)
118 {
119     const std::pair<map_t::iterator, map_t::iterator> range =
120       _inprocs.equal_range (endpoint_uri_str_);
121     if (range.first == range.second) {
122         errno = ENOENT;
123         return -1;
124     }
125 
126     for (map_t::iterator it = range.first; it != range.second; ++it) {
127         it->second->send_disconnect_msg ();
128         it->second->terminate (true);
129     }
130     _inprocs.erase (range.first, range.second);
131     return 0;
132 }
133 
erase_pipe(const pipe_t * pipe_)134 void zmq::socket_base_t::inprocs_t::erase_pipe (const pipe_t *pipe_)
135 {
136     for (map_t::iterator it = _inprocs.begin (), end = _inprocs.end ();
137          it != end; ++it)
138         if (it->second == pipe_) {
139             _inprocs.erase (it);
140             break;
141         }
142 }
143 
check_tag() const144 bool zmq::socket_base_t::check_tag () const
145 {
146     return _tag == 0xbaddecaf;
147 }
148 
is_thread_safe() const149 bool zmq::socket_base_t::is_thread_safe () const
150 {
151     return _thread_safe;
152 }
153 
create(int type_,class ctx_t * parent_,uint32_t tid_,int sid_)154 zmq::socket_base_t *zmq::socket_base_t::create (int type_,
155                                                 class ctx_t *parent_,
156                                                 uint32_t tid_,
157                                                 int sid_)
158 {
159     socket_base_t *s = NULL;
160     switch (type_) {
161         case ZMQ_PAIR:
162             s = new (std::nothrow) pair_t (parent_, tid_, sid_);
163             break;
164         case ZMQ_PUB:
165             s = new (std::nothrow) pub_t (parent_, tid_, sid_);
166             break;
167         case ZMQ_SUB:
168             s = new (std::nothrow) sub_t (parent_, tid_, sid_);
169             break;
170         case ZMQ_REQ:
171             s = new (std::nothrow) req_t (parent_, tid_, sid_);
172             break;
173         case ZMQ_REP:
174             s = new (std::nothrow) rep_t (parent_, tid_, sid_);
175             break;
176         case ZMQ_DEALER:
177             s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
178             break;
179         case ZMQ_ROUTER:
180             s = new (std::nothrow) router_t (parent_, tid_, sid_);
181             break;
182         case ZMQ_PULL:
183             s = new (std::nothrow) pull_t (parent_, tid_, sid_);
184             break;
185         case ZMQ_PUSH:
186             s = new (std::nothrow) push_t (parent_, tid_, sid_);
187             break;
188         case ZMQ_XPUB:
189             s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
190             break;
191         case ZMQ_XSUB:
192             s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
193             break;
194         case ZMQ_STREAM:
195             s = new (std::nothrow) stream_t (parent_, tid_, sid_);
196             break;
197         case ZMQ_SERVER:
198             s = new (std::nothrow) server_t (parent_, tid_, sid_);
199             break;
200         case ZMQ_CLIENT:
201             s = new (std::nothrow) client_t (parent_, tid_, sid_);
202             break;
203         case ZMQ_RADIO:
204             s = new (std::nothrow) radio_t (parent_, tid_, sid_);
205             break;
206         case ZMQ_DISH:
207             s = new (std::nothrow) dish_t (parent_, tid_, sid_);
208             break;
209         case ZMQ_GATHER:
210             s = new (std::nothrow) gather_t (parent_, tid_, sid_);
211             break;
212         case ZMQ_SCATTER:
213             s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
214             break;
215         case ZMQ_DGRAM:
216             s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
217             break;
218         case ZMQ_PEER:
219             s = new (std::nothrow) peer_t (parent_, tid_, sid_);
220             break;
221         case ZMQ_CHANNEL:
222             s = new (std::nothrow) channel_t (parent_, tid_, sid_);
223             break;
224         default:
225             errno = EINVAL;
226             return NULL;
227     }
228 
229     alloc_assert (s);
230 
231     if (s->_mailbox == NULL) {
232         s->_destroyed = true;
233         LIBZMQ_DELETE (s);
234         return NULL;
235     }
236 
237     return s;
238 }
239 
socket_base_t(ctx_t * parent_,uint32_t tid_,int sid_,bool thread_safe_)240 zmq::socket_base_t::socket_base_t (ctx_t *parent_,
241                                    uint32_t tid_,
242                                    int sid_,
243                                    bool thread_safe_) :
244     own_t (parent_, tid_),
245     _sync (),
246     _tag (0xbaddecaf),
247     _ctx_terminated (false),
248     _destroyed (false),
249     _poller (NULL),
250     _handle (static_cast<poller_t::handle_t> (NULL)),
251     _last_tsc (0),
252     _ticks (0),
253     _rcvmore (false),
254     _monitor_socket (NULL),
255     _monitor_events (0),
256     _thread_safe (thread_safe_),
257     _reaper_signaler (NULL),
258     _monitor_sync ()
259 {
260     options.socket_id = sid_;
261     options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
262     options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
263     options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0;
264 
265     if (_thread_safe) {
266         _mailbox = new (std::nothrow) mailbox_safe_t (&_sync);
267         zmq_assert (_mailbox);
268     } else {
269         mailbox_t *m = new (std::nothrow) mailbox_t ();
270         zmq_assert (m);
271 
272         if (m->get_fd () != retired_fd)
273             _mailbox = m;
274         else {
275             LIBZMQ_DELETE (m);
276             _mailbox = NULL;
277         }
278     }
279 }
280 
get_peer_state(const void * routing_id_,size_t routing_id_size_) const281 int zmq::socket_base_t::get_peer_state (const void *routing_id_,
282                                         size_t routing_id_size_) const
283 {
284     LIBZMQ_UNUSED (routing_id_);
285     LIBZMQ_UNUSED (routing_id_size_);
286 
287     //  Only ROUTER sockets support this
288     errno = ENOTSUP;
289     return -1;
290 }
291 
~socket_base_t()292 zmq::socket_base_t::~socket_base_t ()
293 {
294     if (_mailbox)
295         LIBZMQ_DELETE (_mailbox);
296 
297     if (_reaper_signaler)
298         LIBZMQ_DELETE (_reaper_signaler);
299 
300     scoped_lock_t lock (_monitor_sync);
301     stop_monitor ();
302 
303     zmq_assert (_destroyed);
304 }
305 
get_mailbox() const306 zmq::i_mailbox *zmq::socket_base_t::get_mailbox () const
307 {
308     return _mailbox;
309 }
310 
stop()311 void zmq::socket_base_t::stop ()
312 {
313     //  Called by ctx when it is terminated (zmq_ctx_term).
314     //  'stop' command is sent from the threads that called zmq_ctx_term to
315     //  the thread owning the socket. This way, blocking call in the
316     //  owner thread can be interrupted.
317     send_stop ();
318 }
319 
320 // TODO consider renaming protocol_ to scheme_ in conformance with RFC 3986
321 // terminology, but this requires extensive changes to be consistent
parse_uri(const char * uri_,std::string & protocol_,std::string & path_)322 int zmq::socket_base_t::parse_uri (const char *uri_,
323                                    std::string &protocol_,
324                                    std::string &path_)
325 {
326     zmq_assert (uri_ != NULL);
327 
328     const std::string uri (uri_);
329     const std::string::size_type pos = uri.find ("://");
330     if (pos == std::string::npos) {
331         errno = EINVAL;
332         return -1;
333     }
334     protocol_ = uri.substr (0, pos);
335     path_ = uri.substr (pos + 3);
336 
337     if (protocol_.empty () || path_.empty ()) {
338         errno = EINVAL;
339         return -1;
340     }
341     return 0;
342 }
343 
check_protocol(const std::string & protocol_) const344 int zmq::socket_base_t::check_protocol (const std::string &protocol_) const
345 {
346     //  First check out whether the protocol is something we are aware of.
347     if (protocol_ != protocol_name::inproc
348 #if defined ZMQ_HAVE_IPC
349         && protocol_ != protocol_name::ipc
350 #endif
351         && protocol_ != protocol_name::tcp
352 #ifdef ZMQ_HAVE_WS
353         && protocol_ != protocol_name::ws
354 #endif
355 #ifdef ZMQ_HAVE_WSS
356         && protocol_ != protocol_name::wss
357 #endif
358 #if defined ZMQ_HAVE_OPENPGM
359         //  pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
360         && protocol_ != protocol_name::pgm
361         && protocol_ != protocol_name::epgm
362 #endif
363 #if defined ZMQ_HAVE_TIPC
364         // TIPC transport is only available on Linux.
365         && protocol_ != protocol_name::tipc
366 #endif
367 #if defined ZMQ_HAVE_NORM
368         && protocol_ != protocol_name::norm
369 #endif
370 #if defined ZMQ_HAVE_VMCI
371         && protocol_ != protocol_name::vmci
372 #endif
373         && protocol_ != protocol_name::udp) {
374         errno = EPROTONOSUPPORT;
375         return -1;
376     }
377 
378     //  Check whether socket type and transport protocol match.
379     //  Specifically, multicast protocols can't be combined with
380     //  bi-directional messaging patterns (socket types).
381 #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
382 #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
383     if ((protocol_ == protocol_name::pgm || protocol_ == protocol_name::epgm
384          || protocol_ == protocol_name::norm)
385 #elif defined ZMQ_HAVE_OPENPGM
386     if ((protocol_ == protocol_name::pgm || protocol_ == protocol_name::epgm)
387 #else // defined ZMQ_HAVE_NORM
388     if (protocol_ == protocol_name::norm
389 #endif
390         && options.type != ZMQ_PUB && options.type != ZMQ_SUB
391         && options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
392         errno = ENOCOMPATPROTO;
393         return -1;
394     }
395 #endif
396 
397     if (protocol_ == protocol_name::udp
398         && (options.type != ZMQ_DISH && options.type != ZMQ_RADIO
399             && options.type != ZMQ_DGRAM)) {
400         errno = ENOCOMPATPROTO;
401         return -1;
402     }
403 
404     //  Protocol is available.
405     return 0;
406 }
407 
attach_pipe(pipe_t * pipe_,bool subscribe_to_all_,bool locally_initiated_)408 void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
409                                       bool subscribe_to_all_,
410                                       bool locally_initiated_)
411 {
412     //  First, register the pipe so that we can terminate it later on.
413     pipe_->set_event_sink (this);
414     _pipes.push_back (pipe_);
415 
416     //  Let the derived socket type know about new pipe.
417     xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
418 
419     //  If the socket is already being closed, ask any new pipes to terminate
420     //  straight away.
421     if (is_terminating ()) {
422         register_term_acks (1);
423         pipe_->terminate (false);
424     }
425 }
426 
setsockopt(int option_,const void * optval_,size_t optvallen_)427 int zmq::socket_base_t::setsockopt (int option_,
428                                     const void *optval_,
429                                     size_t optvallen_)
430 {
431     scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
432 
433     if (unlikely (_ctx_terminated)) {
434         errno = ETERM;
435         return -1;
436     }
437 
438     //  First, check whether specific socket type overloads the option.
439     int rc = xsetsockopt (option_, optval_, optvallen_);
440     if (rc == 0 || errno != EINVAL) {
441         return rc;
442     }
443 
444     //  If the socket type doesn't support the option, pass it to
445     //  the generic option parser.
446     rc = options.setsockopt (option_, optval_, optvallen_);
447     update_pipe_options (option_);
448 
449     return rc;
450 }
451 
getsockopt(int option_,void * optval_,size_t * optvallen_)452 int zmq::socket_base_t::getsockopt (int option_,
453                                     void *optval_,
454                                     size_t *optvallen_)
455 {
456     scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
457 
458     if (unlikely (_ctx_terminated)) {
459         errno = ETERM;
460         return -1;
461     }
462 
463     if (option_ == ZMQ_RCVMORE) {
464         return do_getsockopt<int> (optval_, optvallen_, _rcvmore ? 1 : 0);
465     }
466 
467     if (option_ == ZMQ_FD) {
468         if (_thread_safe) {
469             // thread safe socket doesn't provide file descriptor
470             errno = EINVAL;
471             return -1;
472         }
473 
474         return do_getsockopt<fd_t> (
475           optval_, optvallen_,
476           (static_cast<mailbox_t *> (_mailbox))->get_fd ());
477     }
478 
479     if (option_ == ZMQ_EVENTS) {
480         const int rc = process_commands (0, false);
481         if (rc != 0 && (errno == EINTR || errno == ETERM)) {
482             return -1;
483         }
484         errno_assert (rc == 0);
485 
486         return do_getsockopt<int> (optval_, optvallen_,
487                                    (has_out () ? ZMQ_POLLOUT : 0)
488                                      | (has_in () ? ZMQ_POLLIN : 0));
489     }
490 
491     if (option_ == ZMQ_LAST_ENDPOINT) {
492         return do_getsockopt (optval_, optvallen_, _last_endpoint);
493     }
494 
495     if (option_ == ZMQ_THREAD_SAFE) {
496         return do_getsockopt<int> (optval_, optvallen_, _thread_safe ? 1 : 0);
497     }
498 
499     return options.getsockopt (option_, optval_, optvallen_);
500 }
501 
join(const char * group_)502 int zmq::socket_base_t::join (const char *group_)
503 {
504     scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
505 
506     return xjoin (group_);
507 }
508 
leave(const char * group_)509 int zmq::socket_base_t::leave (const char *group_)
510 {
511     scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
512 
513     return xleave (group_);
514 }
515 
add_signaler(signaler_t * s_)516 void zmq::socket_base_t::add_signaler (signaler_t *s_)
517 {
518     zmq_assert (_thread_safe);
519 
520     scoped_lock_t sync_lock (_sync);
521     (static_cast<mailbox_safe_t *> (_mailbox))->add_signaler (s_);
522 }
523 
remove_signaler(signaler_t * s_)524 void zmq::socket_base_t::remove_signaler (signaler_t *s_)
525 {
526     zmq_assert (_thread_safe);
527 
528     scoped_lock_t sync_lock (_sync);
529     (static_cast<mailbox_safe_t *> (_mailbox))->remove_signaler (s_);
530 }
531 
bind(const char * endpoint_uri_)532 int zmq::socket_base_t::bind (const char *endpoint_uri_)
533 {
534     scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
535 
536     if (unlikely (_ctx_terminated)) {
537         errno = ETERM;
538         return -1;
539     }
540 
541     //  Process pending commands, if any.
542     int rc = process_commands (0, false);
543     if (unlikely (rc != 0)) {
544         return -1;
545     }
546 
547     //  Parse endpoint_uri_ string.
548     std::string protocol;
549     std::string address;
550     if (parse_uri (endpoint_uri_, protocol, address)
551         || check_protocol (protocol)) {
552         return -1;
553     }
554 
555     if (protocol == protocol_name::inproc) {
556         const endpoint_t endpoint = {this, options};
557         rc = register_endpoint (endpoint_uri_, endpoint);
558         if (rc == 0) {
559             connect_pending (endpoint_uri_, this);
560             _last_endpoint.assign (endpoint_uri_);
561             options.connected = true;
562         }
563         return rc;
564     }
565 
566 #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
567 #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
568     if (protocol == protocol_name::pgm || protocol == protocol_name::epgm
569         || protocol == protocol_name::norm) {
570 #elif defined ZMQ_HAVE_OPENPGM
571     if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
572 #else // defined ZMQ_HAVE_NORM
573     if (protocol == protocol_name::norm) {
574 #endif
575         //  For convenience's sake, bind can be used interchangeable with
576         //  connect for PGM, EPGM, NORM transports.
577         rc = connect (endpoint_uri_);
578         if (rc != -1)
579             options.connected = true;
580         return rc;
581     }
582 #endif
583 
584     if (protocol == protocol_name::udp) {
585         if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
586             errno = ENOCOMPATPROTO;
587             return -1;
588         }
589 
590         //  Choose the I/O thread to run the session in.
591         io_thread_t *io_thread = choose_io_thread (options.affinity);
592         if (!io_thread) {
593             errno = EMTHREAD;
594             return -1;
595         }
596 
597         address_t *paddr =
598           new (std::nothrow) address_t (protocol, address, this->get_ctx ());
599         alloc_assert (paddr);
600 
601         paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
602         alloc_assert (paddr->resolved.udp_addr);
603         rc = paddr->resolved.udp_addr->resolve (address.c_str (), true,
604                                                 options.ipv6);
605         if (rc != 0) {
606             LIBZMQ_DELETE (paddr);
607             return -1;
608         }
609 
610         session_base_t *session =
611           session_base_t::create (io_thread, true, this, options, paddr);
612         errno_assert (session);
613 
614         //  Create a bi-directional pipe.
615         object_t *parents[2] = {this, session};
616         pipe_t *new_pipes[2] = {NULL, NULL};
617 
618         int hwms[2] = {options.sndhwm, options.rcvhwm};
619         bool conflates[2] = {false, false};
620         rc = pipepair (parents, new_pipes, hwms, conflates);
621         errno_assert (rc == 0);
622 
623         //  Attach local end of the pipe to the socket object.
624         attach_pipe (new_pipes[0], true, true);
625         pipe_t *const newpipe = new_pipes[0];
626 
627         //  Attach remote end of the pipe to the session object later on.
628         session->attach_pipe (new_pipes[1]);
629 
630         //  Save last endpoint URI
631         paddr->to_string (_last_endpoint);
632 
633         //  TODO shouldn't this use _last_endpoint instead of endpoint_uri_? as in the other cases
634         add_endpoint (endpoint_uri_pair_t (endpoint_uri_, std::string (),
635                                            endpoint_type_none),
636                       static_cast<own_t *> (session), newpipe);
637 
638         return 0;
639     }
640 
641     //  Remaining transports require to be run in an I/O thread, so at this
642     //  point we'll choose one.
643     io_thread_t *io_thread = choose_io_thread (options.affinity);
644     if (!io_thread) {
645         errno = EMTHREAD;
646         return -1;
647     }
648 
649     if (protocol == protocol_name::tcp) {
650         tcp_listener_t *listener =
651           new (std::nothrow) tcp_listener_t (io_thread, this, options);
652         alloc_assert (listener);
653         rc = listener->set_local_address (address.c_str ());
654         if (rc != 0) {
655             LIBZMQ_DELETE (listener);
656             event_bind_failed (make_unconnected_bind_endpoint_pair (address),
657                                zmq_errno ());
658             return -1;
659         }
660 
661         // Save last endpoint URI
662         listener->get_local_address (_last_endpoint);
663 
664         add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
665                       static_cast<own_t *> (listener), NULL);
666         options.connected = true;
667         return 0;
668     }
669 
670 #ifdef ZMQ_HAVE_WS
671 #ifdef ZMQ_HAVE_WSS
672     if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
673         ws_listener_t *listener = new (std::nothrow) ws_listener_t (
674           io_thread, this, options, protocol == protocol_name::wss);
675 #else
676     if (protocol == protocol_name::ws) {
677         ws_listener_t *listener =
678           new (std::nothrow) ws_listener_t (io_thread, this, options, false);
679 #endif
680         alloc_assert (listener);
681         rc = listener->set_local_address (address.c_str ());
682         if (rc != 0) {
683             LIBZMQ_DELETE (listener);
684             event_bind_failed (make_unconnected_bind_endpoint_pair (address),
685                                zmq_errno ());
686             return -1;
687         }
688 
689         // Save last endpoint URI
690         listener->get_local_address (_last_endpoint);
691 
692         add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
693                       static_cast<own_t *> (listener), NULL);
694         options.connected = true;
695         return 0;
696     }
697 #endif
698 
699 #if defined ZMQ_HAVE_IPC
700     if (protocol == protocol_name::ipc) {
701         ipc_listener_t *listener =
702           new (std::nothrow) ipc_listener_t (io_thread, this, options);
703         alloc_assert (listener);
704         int rc = listener->set_local_address (address.c_str ());
705         if (rc != 0) {
706             LIBZMQ_DELETE (listener);
707             event_bind_failed (make_unconnected_bind_endpoint_pair (address),
708                                zmq_errno ());
709             return -1;
710         }
711 
712         // Save last endpoint URI
713         listener->get_local_address (_last_endpoint);
714 
715         add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
716                       static_cast<own_t *> (listener), NULL);
717         options.connected = true;
718         return 0;
719     }
720 #endif
721 #if defined ZMQ_HAVE_TIPC
722     if (protocol == protocol_name::tipc) {
723         tipc_listener_t *listener =
724           new (std::nothrow) tipc_listener_t (io_thread, this, options);
725         alloc_assert (listener);
726         int rc = listener->set_local_address (address.c_str ());
727         if (rc != 0) {
728             LIBZMQ_DELETE (listener);
729             event_bind_failed (make_unconnected_bind_endpoint_pair (address),
730                                zmq_errno ());
731             return -1;
732         }
733 
734         // Save last endpoint URI
735         listener->get_local_address (_last_endpoint);
736 
737         // TODO shouldn't this use _last_endpoint as in the other cases?
738         add_endpoint (make_unconnected_bind_endpoint_pair (endpoint_uri_),
739                       static_cast<own_t *> (listener), NULL);
740         options.connected = true;
741         return 0;
742     }
743 #endif
744 #if defined ZMQ_HAVE_VMCI
745     if (protocol == protocol_name::vmci) {
746         vmci_listener_t *listener =
747           new (std::nothrow) vmci_listener_t (io_thread, this, options);
748         alloc_assert (listener);
749         int rc = listener->set_local_address (address.c_str ());
750         if (rc != 0) {
751             LIBZMQ_DELETE (listener);
752             event_bind_failed (make_unconnected_bind_endpoint_pair (address),
753                                zmq_errno ());
754             return -1;
755         }
756 
757         listener->get_local_address (_last_endpoint);
758 
759         add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
760                       static_cast<own_t *> (listener), NULL);
761         options.connected = true;
762         return 0;
763     }
764 #endif
765 
766     zmq_assert (false);
767     return -1;
768 }
769 
770 int zmq::socket_base_t::connect (const char *endpoint_uri_)
771 {
772     scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
773     return connect_internal (endpoint_uri_);
774 }
775 
776 int zmq::socket_base_t::connect_internal (const char *endpoint_uri_)
777 {
778     if (unlikely (_ctx_terminated)) {
779         errno = ETERM;
780         return -1;
781     }
782 
783     //  Process pending commands, if any.
784     int rc = process_commands (0, false);
785     if (unlikely (rc != 0)) {
786         return -1;
787     }
788 
789     //  Parse endpoint_uri_ string.
790     std::string protocol;
791     std::string address;
792     if (parse_uri (endpoint_uri_, protocol, address)
793         || check_protocol (protocol)) {
794         return -1;
795     }
796 
797     if (protocol == protocol_name::inproc) {
798         //  TODO: inproc connect is specific with respect to creating pipes
799         //  as there's no 'reconnect' functionality implemented. Once that
800         //  is in place we should follow generic pipe creation algorithm.
801 
802         //  Find the peer endpoint.
803         const endpoint_t peer = find_endpoint (endpoint_uri_);
804 
805         // The total HWM for an inproc connection should be the sum of
806         // the binder's HWM and the connector's HWM.
807         const int sndhwm = peer.socket == NULL
808                              ? options.sndhwm
809                              : options.sndhwm != 0 && peer.options.rcvhwm != 0
810                                  ? options.sndhwm + peer.options.rcvhwm
811                                  : 0;
812         const int rcvhwm = peer.socket == NULL
813                              ? options.rcvhwm
814                              : options.rcvhwm != 0 && peer.options.sndhwm != 0
815                                  ? options.rcvhwm + peer.options.sndhwm
816                                  : 0;
817 
818         //  Create a bi-directional pipe to connect the peers.
819         object_t *parents[2] = {this, peer.socket == NULL ? this : peer.socket};
820         pipe_t *new_pipes[2] = {NULL, NULL};
821 
822         const bool conflate = get_effective_conflate_option (options);
823 
824         int hwms[2] = {conflate ? -1 : sndhwm, conflate ? -1 : rcvhwm};
825         bool conflates[2] = {conflate, conflate};
826         rc = pipepair (parents, new_pipes, hwms, conflates);
827         if (!conflate) {
828             new_pipes[0]->set_hwms_boost (peer.options.sndhwm,
829                                           peer.options.rcvhwm);
830             new_pipes[1]->set_hwms_boost (options.sndhwm, options.rcvhwm);
831         }
832 
833         errno_assert (rc == 0);
834 
835         if (!peer.socket) {
836             //  The peer doesn't exist yet so we don't know whether
837             //  to send the routing id message or not. To resolve this,
838             //  we always send our routing id and drop it later if
839             //  the peer doesn't expect it.
840             send_routing_id (new_pipes[0], options);
841 
842 #ifdef ZMQ_BUILD_DRAFT_API
843             //  If set, send the hello msg of the local socket to the peer.
844             if (options.can_send_hello_msg && options.hello_msg.size () > 0) {
845                 send_hello_msg (new_pipes[0], options);
846             }
847 #endif
848 
849             const endpoint_t endpoint = {this, options};
850             pend_connection (std::string (endpoint_uri_), endpoint, new_pipes);
851         } else {
852             //  If required, send the routing id of the local socket to the peer.
853             if (peer.options.recv_routing_id) {
854                 send_routing_id (new_pipes[0], options);
855             }
856 
857             //  If required, send the routing id of the peer to the local socket.
858             if (options.recv_routing_id) {
859                 send_routing_id (new_pipes[1], peer.options);
860             }
861 
862 #ifdef ZMQ_BUILD_DRAFT_API
863             //  If set, send the hello msg of the local socket to the peer.
864             if (options.can_send_hello_msg && options.hello_msg.size () > 0) {
865                 send_hello_msg (new_pipes[0], options);
866             }
867 
868             //  If set, send the hello msg of the peer to the local socket.
869             if (peer.options.can_send_hello_msg
870                 && peer.options.hello_msg.size () > 0) {
871                 send_hello_msg (new_pipes[1], peer.options);
872             }
873 
874             if (peer.options.can_recv_disconnect_msg
875                 && peer.options.disconnect_msg.size () > 0)
876                 new_pipes[0]->set_disconnect_msg (peer.options.disconnect_msg);
877 #endif
878 
879             //  Attach remote end of the pipe to the peer socket. Note that peer's
880             //  seqnum was incremented in find_endpoint function. We don't need it
881             //  increased here.
882             send_bind (peer.socket, new_pipes[1], false);
883         }
884 
885         //  Attach local end of the pipe to this socket object.
886         attach_pipe (new_pipes[0], false, true);
887 
888         // Save last endpoint URI
889         _last_endpoint.assign (endpoint_uri_);
890 
891         // remember inproc connections for disconnect
892         _inprocs.emplace (endpoint_uri_, new_pipes[0]);
893 
894         options.connected = true;
895         return 0;
896     }
897     const bool is_single_connect =
898       (options.type == ZMQ_DEALER || options.type == ZMQ_SUB
899        || options.type == ZMQ_PUB || options.type == ZMQ_REQ);
900     if (unlikely (is_single_connect)) {
901         if (0 != _endpoints.count (endpoint_uri_)) {
902             // There is no valid use for multiple connects for SUB-PUB nor
903             // DEALER-ROUTER nor REQ-REP. Multiple connects produces
904             // nonsensical results.
905             return 0;
906         }
907     }
908 
909     //  Choose the I/O thread to run the session in.
910     io_thread_t *io_thread = choose_io_thread (options.affinity);
911     if (!io_thread) {
912         errno = EMTHREAD;
913         return -1;
914     }
915 
916     address_t *paddr =
917       new (std::nothrow) address_t (protocol, address, this->get_ctx ());
918     alloc_assert (paddr);
919 
920     //  Resolve address (if needed by the protocol)
921     if (protocol == protocol_name::tcp) {
922         //  Do some basic sanity checks on tcp:// address syntax
923         //  - hostname starts with digit or letter, with embedded '-' or '.'
924         //  - IPv6 address may contain hex chars and colons.
925         //  - IPv6 link local address may contain % followed by interface name / zone_id
926         //    (Reference: https://tools.ietf.org/html/rfc4007)
927         //  - IPv4 address may contain decimal digits and dots.
928         //  - Address must end in ":port" where port is *, or numeric
929         //  - Address may contain two parts separated by ':'
930         //  Following code is quick and dirty check to catch obvious errors,
931         //  without trying to be fully accurate.
932         const char *check = address.c_str ();
933         if (isalnum (*check) || isxdigit (*check) || *check == '['
934             || *check == ':') {
935             check++;
936             while (isalnum (*check) || isxdigit (*check) || *check == '.'
937                    || *check == '-' || *check == ':' || *check == '%'
938                    || *check == ';' || *check == '[' || *check == ']'
939                    || *check == '_' || *check == '*') {
940                 check++;
941             }
942         }
943         //  Assume the worst, now look for success
944         rc = -1;
945         //  Did we reach the end of the address safely?
946         if (*check == 0) {
947             //  Do we have a valid port string? (cannot be '*' in connect
948             check = strrchr (address.c_str (), ':');
949             if (check) {
950                 check++;
951                 if (*check && (isdigit (*check)))
952                     rc = 0; //  Valid
953             }
954         }
955         if (rc == -1) {
956             errno = EINVAL;
957             LIBZMQ_DELETE (paddr);
958             return -1;
959         }
960         //  Defer resolution until a socket is opened
961         paddr->resolved.tcp_addr = NULL;
962     }
963 #ifdef ZMQ_HAVE_WS
964 #ifdef ZMQ_HAVE_WSS
965     else if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
966         if (protocol == protocol_name::wss) {
967             paddr->resolved.wss_addr = new (std::nothrow) wss_address_t ();
968             alloc_assert (paddr->resolved.wss_addr);
969             rc = paddr->resolved.wss_addr->resolve (address.c_str (), false,
970                                                     options.ipv6);
971         } else
972 #else
973     else if (protocol == protocol_name::ws) {
974 #endif
975         {
976             paddr->resolved.ws_addr = new (std::nothrow) ws_address_t ();
977             alloc_assert (paddr->resolved.ws_addr);
978             rc = paddr->resolved.ws_addr->resolve (address.c_str (), false,
979                                                    options.ipv6);
980         }
981 
982         if (rc != 0) {
983             LIBZMQ_DELETE (paddr);
984             return -1;
985         }
986     }
987 #endif
988 
989 #if defined ZMQ_HAVE_IPC
990     else if (protocol == protocol_name::ipc) {
991         paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
992         alloc_assert (paddr->resolved.ipc_addr);
993         int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
994         if (rc != 0) {
995             LIBZMQ_DELETE (paddr);
996             return -1;
997         }
998     }
999 #endif
1000 
1001     if (protocol == protocol_name::udp) {
1002         if (options.type != ZMQ_RADIO) {
1003             errno = ENOCOMPATPROTO;
1004             LIBZMQ_DELETE (paddr);
1005             return -1;
1006         }
1007 
1008         paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
1009         alloc_assert (paddr->resolved.udp_addr);
1010         rc = paddr->resolved.udp_addr->resolve (address.c_str (), false,
1011                                                 options.ipv6);
1012         if (rc != 0) {
1013             LIBZMQ_DELETE (paddr);
1014             return -1;
1015         }
1016     }
1017 
1018     // TBD - Should we check address for ZMQ_HAVE_NORM???
1019 
1020 #ifdef ZMQ_HAVE_OPENPGM
1021     if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
1022         struct pgm_addrinfo_t *res = NULL;
1023         uint16_t port_number = 0;
1024         int rc =
1025           pgm_socket_t::init_address (address.c_str (), &res, &port_number);
1026         if (res != NULL)
1027             pgm_freeaddrinfo (res);
1028         if (rc != 0 || port_number == 0) {
1029             return -1;
1030         }
1031     }
1032 #endif
1033 #if defined ZMQ_HAVE_TIPC
1034     else if (protocol == protocol_name::tipc) {
1035         paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
1036         alloc_assert (paddr->resolved.tipc_addr);
1037         int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
1038         if (rc != 0) {
1039             LIBZMQ_DELETE (paddr);
1040             return -1;
1041         }
1042         const sockaddr_tipc *const saddr =
1043           reinterpret_cast<const sockaddr_tipc *> (
1044             paddr->resolved.tipc_addr->addr ());
1045         // Cannot connect to random Port Identity
1046         if (saddr->addrtype == TIPC_ADDR_ID
1047             && paddr->resolved.tipc_addr->is_random ()) {
1048             LIBZMQ_DELETE (paddr);
1049             errno = EINVAL;
1050             return -1;
1051         }
1052     }
1053 #endif
1054 #if defined ZMQ_HAVE_VMCI
1055     else if (protocol == protocol_name::vmci) {
1056         paddr->resolved.vmci_addr =
1057           new (std::nothrow) vmci_address_t (this->get_ctx ());
1058         alloc_assert (paddr->resolved.vmci_addr);
1059         int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
1060         if (rc != 0) {
1061             LIBZMQ_DELETE (paddr);
1062             return -1;
1063         }
1064     }
1065 #endif
1066 
1067     //  Create session.
1068     session_base_t *session =
1069       session_base_t::create (io_thread, true, this, options, paddr);
1070     errno_assert (session);
1071 
1072     //  PGM does not support subscription forwarding; ask for all data to be
1073     //  sent to this pipe. (same for NORM, currently?)
1074 #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
1075     const bool subscribe_to_all =
1076       protocol == protocol_name::pgm || protocol == protocol_name::epgm
1077       || protocol == protocol_name::norm || protocol == protocol_name::udp;
1078 #elif defined ZMQ_HAVE_OPENPGM
1079     const bool subscribe_to_all = protocol == protocol_name::pgm
1080                                   || protocol == protocol_name::epgm
1081                                   || protocol == protocol_name::udp;
1082 #elif defined ZMQ_HAVE_NORM
1083     const bool subscribe_to_all =
1084       protocol == protocol_name::norm || protocol == protocol_name::udp;
1085 #else
1086     const bool subscribe_to_all = protocol == protocol_name::udp;
1087 #endif
1088     pipe_t *newpipe = NULL;
1089 
1090     if (options.immediate != 1 || subscribe_to_all) {
1091         //  Create a bi-directional pipe.
1092         object_t *parents[2] = {this, session};
1093         pipe_t *new_pipes[2] = {NULL, NULL};
1094 
1095         const bool conflate = get_effective_conflate_option (options);
1096 
1097         int hwms[2] = {conflate ? -1 : options.sndhwm,
1098                        conflate ? -1 : options.rcvhwm};
1099         bool conflates[2] = {conflate, conflate};
1100         rc = pipepair (parents, new_pipes, hwms, conflates);
1101         errno_assert (rc == 0);
1102 
1103         //  Attach local end of the pipe to the socket object.
1104         attach_pipe (new_pipes[0], subscribe_to_all, true);
1105         newpipe = new_pipes[0];
1106 
1107         //  Attach remote end of the pipe to the session object later on.
1108         session->attach_pipe (new_pipes[1]);
1109     }
1110 
1111     //  Save last endpoint URI
1112     paddr->to_string (_last_endpoint);
1113 
1114     add_endpoint (make_unconnected_connect_endpoint_pair (endpoint_uri_),
1115                   static_cast<own_t *> (session), newpipe);
1116     return 0;
1117 }
1118 
1119 std::string
1120 zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_uri_pair_,
1121                                       const char *tcp_address_)
1122 {
1123     // The resolved last_endpoint is used as a key in the endpoints map.
1124     // The address passed by the user might not match in the TCP case due to
1125     // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
1126     // resolve before giving up. Given at this stage we don't know whether a
1127     // socket is connected or bound, try with both.
1128     if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
1129         tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
1130         alloc_assert (tcp_addr);
1131         int rc = tcp_addr->resolve (tcp_address_, false, options.ipv6);
1132 
1133         if (rc == 0) {
1134             tcp_addr->to_string (endpoint_uri_pair_);
1135             if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
1136                 rc = tcp_addr->resolve (tcp_address_, true, options.ipv6);
1137                 if (rc == 0) {
1138                     tcp_addr->to_string (endpoint_uri_pair_);
1139                 }
1140             }
1141         }
1142         LIBZMQ_DELETE (tcp_addr);
1143     }
1144     return endpoint_uri_pair_;
1145 }
1146 
1147 void zmq::socket_base_t::add_endpoint (
1148   const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_)
1149 {
1150     //  Activate the session. Make it a child of this socket.
1151     launch_child (endpoint_);
1152     _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.identifier (),
1153                                           endpoint_pipe_t (endpoint_, pipe_));
1154 
1155     if (pipe_ != NULL)
1156         pipe_->set_endpoint_pair (endpoint_pair_);
1157 }
1158 
1159 int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_)
1160 {
1161     scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1162 
1163     //  Check whether the context hasn't been shut down yet.
1164     if (unlikely (_ctx_terminated)) {
1165         errno = ETERM;
1166         return -1;
1167     }
1168 
1169     //  Check whether endpoint address passed to the function is valid.
1170     if (unlikely (!endpoint_uri_)) {
1171         errno = EINVAL;
1172         return -1;
1173     }
1174 
1175     //  Process pending commands, if any, since there could be pending unprocessed process_own()'s
1176     //  (from launch_child() for example) we're asked to terminate now.
1177     const int rc = process_commands (0, false);
1178     if (unlikely (rc != 0)) {
1179         return -1;
1180     }
1181 
1182     //  Parse endpoint_uri_ string.
1183     std::string uri_protocol;
1184     std::string uri_path;
1185     if (parse_uri (endpoint_uri_, uri_protocol, uri_path)
1186         || check_protocol (uri_protocol)) {
1187         return -1;
1188     }
1189 
1190     const std::string endpoint_uri_str = std::string (endpoint_uri_);
1191 
1192     // Disconnect an inproc socket
1193     if (uri_protocol == protocol_name::inproc) {
1194         return unregister_endpoint (endpoint_uri_str, this) == 0
1195                  ? 0
1196                  : _inprocs.erase_pipes (endpoint_uri_str);
1197     }
1198 
1199     const std::string resolved_endpoint_uri =
1200       uri_protocol == protocol_name::tcp
1201         ? resolve_tcp_addr (endpoint_uri_str, uri_path.c_str ())
1202         : endpoint_uri_str;
1203 
1204     //  Find the endpoints range (if any) corresponding to the endpoint_uri_pair_ string.
1205     const std::pair<endpoints_t::iterator, endpoints_t::iterator> range =
1206       _endpoints.equal_range (resolved_endpoint_uri);
1207     if (range.first == range.second) {
1208         errno = ENOENT;
1209         return -1;
1210     }
1211 
1212     for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1213         //  If we have an associated pipe, terminate it.
1214         if (it->second.second != NULL)
1215             it->second.second->terminate (false);
1216         term_child (it->second.first);
1217     }
1218     _endpoints.erase (range.first, range.second);
1219 
1220     if (options.reconnect_stop & ZMQ_RECONNECT_STOP_AFTER_DISCONNECT) {
1221         _disconnected = true;
1222     }
1223 
1224     return 0;
1225 }
1226 
1227 int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1228 {
1229     scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1230 
1231     //  Check whether the context hasn't been shut down yet.
1232     if (unlikely (_ctx_terminated)) {
1233         errno = ETERM;
1234         return -1;
1235     }
1236 
1237     //  Check whether message passed to the function is valid.
1238     if (unlikely (!msg_ || !msg_->check ())) {
1239         errno = EFAULT;
1240         return -1;
1241     }
1242 
1243     //  Process pending commands, if any.
1244     int rc = process_commands (0, true);
1245     if (unlikely (rc != 0)) {
1246         return -1;
1247     }
1248 
1249     //  Clear any user-visible flags that are set on the message.
1250     msg_->reset_flags (msg_t::more);
1251 
1252     //  At this point we impose the flags on the message.
1253     if (flags_ & ZMQ_SNDMORE)
1254         msg_->set_flags (msg_t::more);
1255 
1256     msg_->reset_metadata ();
1257 
1258     //  Try to send the message using method in each socket class
1259     rc = xsend (msg_);
1260     if (rc == 0) {
1261         return 0;
1262     }
1263     //  Special case for ZMQ_PUSH: -2 means pipe is dead while a
1264     //  multi-part send is in progress and can't be recovered, so drop
1265     //  silently when in blocking mode to keep backward compatibility.
1266     if (unlikely (rc == -2)) {
1267         if (!((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0)) {
1268             rc = msg_->close ();
1269             errno_assert (rc == 0);
1270             rc = msg_->init ();
1271             errno_assert (rc == 0);
1272             return 0;
1273         }
1274     }
1275     if (unlikely (errno != EAGAIN)) {
1276         return -1;
1277     }
1278 
1279     //  In case of non-blocking send we'll simply propagate
1280     //  the error - including EAGAIN - up the stack.
1281     if ((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0) {
1282         return -1;
1283     }
1284 
1285     //  Compute the time when the timeout should occur.
1286     //  If the timeout is infinite, don't care.
1287     int timeout = options.sndtimeo;
1288     const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
1289 
1290     //  Oops, we couldn't send the message. Wait for the next
1291     //  command, process it and try to send the message again.
1292     //  If timeout is reached in the meantime, return EAGAIN.
1293     while (true) {
1294         if (unlikely (process_commands (timeout, false) != 0)) {
1295             return -1;
1296         }
1297         rc = xsend (msg_);
1298         if (rc == 0)
1299             break;
1300         if (unlikely (errno != EAGAIN)) {
1301             return -1;
1302         }
1303         if (timeout > 0) {
1304             timeout = static_cast<int> (end - _clock.now_ms ());
1305             if (timeout <= 0) {
1306                 errno = EAGAIN;
1307                 return -1;
1308             }
1309         }
1310     }
1311 
1312     return 0;
1313 }
1314 
1315 int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1316 {
1317     scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1318 
1319     //  Check whether the context hasn't been shut down yet.
1320     if (unlikely (_ctx_terminated)) {
1321         errno = ETERM;
1322         return -1;
1323     }
1324 
1325     //  Check whether message passed to the function is valid.
1326     if (unlikely (!msg_ || !msg_->check ())) {
1327         errno = EFAULT;
1328         return -1;
1329     }
1330 
1331     //  Once every inbound_poll_rate messages check for signals and process
1332     //  incoming commands. This happens only if we are not polling altogether
1333     //  because there are messages available all the time. If poll occurs,
1334     //  ticks is set to zero and thus we avoid this code.
1335     //
1336     //  Note that 'recv' uses different command throttling algorithm (the one
1337     //  described above) from the one used by 'send'. This is because counting
1338     //  ticks is more efficient than doing RDTSC all the time.
1339     if (++_ticks == inbound_poll_rate) {
1340         if (unlikely (process_commands (0, false) != 0)) {
1341             return -1;
1342         }
1343         _ticks = 0;
1344     }
1345 
1346     //  Get the message.
1347     int rc = xrecv (msg_);
1348     if (unlikely (rc != 0 && errno != EAGAIN)) {
1349         return -1;
1350     }
1351 
1352     //  If we have the message, return immediately.
1353     if (rc == 0) {
1354         extract_flags (msg_);
1355         return 0;
1356     }
1357 
1358     //  If the message cannot be fetched immediately, there are two scenarios.
1359     //  For non-blocking recv, commands are processed in case there's an
1360     //  activate_reader command already waiting in a command pipe.
1361     //  If it's not, return EAGAIN.
1362     if ((flags_ & ZMQ_DONTWAIT) || options.rcvtimeo == 0) {
1363         if (unlikely (process_commands (0, false) != 0)) {
1364             return -1;
1365         }
1366         _ticks = 0;
1367 
1368         rc = xrecv (msg_);
1369         if (rc < 0) {
1370             return rc;
1371         }
1372         extract_flags (msg_);
1373 
1374         return 0;
1375     }
1376 
1377     //  Compute the time when the timeout should occur.
1378     //  If the timeout is infinite, don't care.
1379     int timeout = options.rcvtimeo;
1380     const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
1381 
1382     //  In blocking scenario, commands are processed over and over again until
1383     //  we are able to fetch a message.
1384     bool block = (_ticks != 0);
1385     while (true) {
1386         if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1387             return -1;
1388         }
1389         rc = xrecv (msg_);
1390         if (rc == 0) {
1391             _ticks = 0;
1392             break;
1393         }
1394         if (unlikely (errno != EAGAIN)) {
1395             return -1;
1396         }
1397         block = true;
1398         if (timeout > 0) {
1399             timeout = static_cast<int> (end - _clock.now_ms ());
1400             if (timeout <= 0) {
1401                 errno = EAGAIN;
1402                 return -1;
1403             }
1404         }
1405     }
1406 
1407     extract_flags (msg_);
1408     return 0;
1409 }
1410 
1411 int zmq::socket_base_t::close ()
1412 {
1413     scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1414 
1415     //  Remove all existing signalers for thread safe sockets
1416     if (_thread_safe)
1417         (static_cast<mailbox_safe_t *> (_mailbox))->clear_signalers ();
1418 
1419     //  Mark the socket as dead
1420     _tag = 0xdeadbeef;
1421 
1422 
1423     //  Transfer the ownership of the socket from this application thread
1424     //  to the reaper thread which will take care of the rest of shutdown
1425     //  process.
1426     send_reap (this);
1427 
1428     return 0;
1429 }
1430 
1431 bool zmq::socket_base_t::has_in ()
1432 {
1433     return xhas_in ();
1434 }
1435 
1436 bool zmq::socket_base_t::has_out ()
1437 {
1438     return xhas_out ();
1439 }
1440 
1441 void zmq::socket_base_t::start_reaping (poller_t *poller_)
1442 {
1443     //  Plug the socket to the reaper thread.
1444     _poller = poller_;
1445 
1446     fd_t fd;
1447 
1448     if (!_thread_safe)
1449         fd = (static_cast<mailbox_t *> (_mailbox))->get_fd ();
1450     else {
1451         scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1452 
1453         _reaper_signaler = new (std::nothrow) signaler_t ();
1454         zmq_assert (_reaper_signaler);
1455 
1456         //  Add signaler to the safe mailbox
1457         fd = _reaper_signaler->get_fd ();
1458         (static_cast<mailbox_safe_t *> (_mailbox))
1459           ->add_signaler (_reaper_signaler);
1460 
1461         //  Send a signal to make sure reaper handle existing commands
1462         _reaper_signaler->send ();
1463     }
1464 
1465     _handle = _poller->add_fd (fd, this);
1466     _poller->set_pollin (_handle);
1467 
1468     //  Initialise the termination and check whether it can be deallocated
1469     //  immediately.
1470     terminate ();
1471     check_destroy ();
1472 }
1473 
1474 int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
1475 {
1476     if (timeout_ == 0) {
1477         //  If we are asked not to wait, check whether we haven't processed
1478         //  commands recently, so that we can throttle the new commands.
1479 
1480         //  Get the CPU's tick counter. If 0, the counter is not available.
1481         const uint64_t tsc = zmq::clock_t::rdtsc ();
1482 
1483         //  Optimised version of command processing - it doesn't have to check
1484         //  for incoming commands each time. It does so only if certain time
1485         //  elapsed since last command processing. Command delay varies
1486         //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
1487         //  etc. The optimisation makes sense only on platforms where getting
1488         //  a timestamp is a very cheap operation (tens of nanoseconds).
1489         if (tsc && throttle_) {
1490             //  Check whether TSC haven't jumped backwards (in case of migration
1491             //  between CPU cores) and whether certain time have elapsed since
1492             //  last command processing. If it didn't do nothing.
1493             if (tsc >= _last_tsc && tsc - _last_tsc <= max_command_delay)
1494                 return 0;
1495             _last_tsc = tsc;
1496         }
1497     }
1498 
1499     //  Check whether there are any commands pending for this thread.
1500     command_t cmd;
1501     int rc = _mailbox->recv (&cmd, timeout_);
1502 
1503     //  Process all available commands.
1504     while (rc == 0) {
1505         cmd.destination->process_command (cmd);
1506         rc = _mailbox->recv (&cmd, 0);
1507     }
1508 
1509     if (errno == EINTR)
1510         return -1;
1511 
1512     zmq_assert (errno == EAGAIN);
1513 
1514     if (_ctx_terminated) {
1515         errno = ETERM;
1516         return -1;
1517     }
1518 
1519     return 0;
1520 }
1521 
1522 void zmq::socket_base_t::process_stop ()
1523 {
1524     //  Here, someone have called zmq_ctx_term while the socket was still alive.
1525     //  We'll remember the fact so that any blocking call is interrupted and any
1526     //  further attempt to use the socket will return ETERM. The user is still
1527     //  responsible for calling zmq_close on the socket though!
1528     scoped_lock_t lock (_monitor_sync);
1529     stop_monitor ();
1530 
1531     _ctx_terminated = true;
1532 }
1533 
1534 void zmq::socket_base_t::process_bind (pipe_t *pipe_)
1535 {
1536     attach_pipe (pipe_);
1537 }
1538 
1539 void zmq::socket_base_t::process_term (int linger_)
1540 {
1541     //  Unregister all inproc endpoints associated with this socket.
1542     //  Doing this we make sure that no new pipes from other sockets (inproc)
1543     //  will be initiated.
1544     unregister_endpoints (this);
1545 
1546     //  Ask all attached pipes to terminate.
1547     for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
1548         //  Only inprocs might have a disconnect message set
1549         _pipes[i]->send_disconnect_msg ();
1550         _pipes[i]->terminate (false);
1551     }
1552     register_term_acks (static_cast<int> (_pipes.size ()));
1553 
1554     //  Continue the termination process immediately.
1555     own_t::process_term (linger_);
1556 }
1557 
1558 void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
1559 {
1560     term_endpoint (endpoint_->c_str ());
1561     delete endpoint_;
1562 }
1563 
1564 void zmq::socket_base_t::process_pipe_stats_publish (
1565   uint64_t outbound_queue_count_,
1566   uint64_t inbound_queue_count_,
1567   endpoint_uri_pair_t *endpoint_pair_)
1568 {
1569     uint64_t values[2] = {outbound_queue_count_, inbound_queue_count_};
1570     event (*endpoint_pair_, values, 2, ZMQ_EVENT_PIPES_STATS);
1571     delete endpoint_pair_;
1572 }
1573 
1574 /*
1575  * There are 2 pipes per connection, and the inbound one _must_ be queried from
1576  * the I/O thread. So ask the outbound pipe, in the application thread, to send
1577  * a message (pipe_peer_stats) to its peer. The message will carry the outbound
1578  * pipe stats and endpoint, and the reference to the socket object.
1579  * The inbound pipe on the I/O thread will then add its own stats and endpoint,
1580  * and write back a message to the socket object (pipe_stats_publish) which
1581  * will raise an event with the data.
1582  */
1583 int zmq::socket_base_t::query_pipes_stats ()
1584 {
1585     {
1586         scoped_lock_t lock (_monitor_sync);
1587         if (!(_monitor_events & ZMQ_EVENT_PIPES_STATS)) {
1588             errno = EINVAL;
1589             return -1;
1590         }
1591     }
1592     if (_pipes.size () == 0) {
1593         errno = EAGAIN;
1594         return -1;
1595     }
1596     for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
1597         _pipes[i]->send_stats_to_peer (this);
1598     }
1599 
1600     return 0;
1601 }
1602 
1603 void zmq::socket_base_t::update_pipe_options (int option_)
1604 {
1605     if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
1606         for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
1607             _pipes[i]->set_hwms (options.rcvhwm, options.sndhwm);
1608             _pipes[i]->send_hwms_to_peer (options.sndhwm, options.rcvhwm);
1609         }
1610     }
1611 }
1612 
1613 void zmq::socket_base_t::process_destroy ()
1614 {
1615     _destroyed = true;
1616 }
1617 
1618 int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1619 {
1620     errno = EINVAL;
1621     return -1;
1622 }
1623 
1624 bool zmq::socket_base_t::xhas_out ()
1625 {
1626     return false;
1627 }
1628 
1629 int zmq::socket_base_t::xsend (msg_t *)
1630 {
1631     errno = ENOTSUP;
1632     return -1;
1633 }
1634 
1635 bool zmq::socket_base_t::xhas_in ()
1636 {
1637     return false;
1638 }
1639 
1640 int zmq::socket_base_t::xjoin (const char *group_)
1641 {
1642     LIBZMQ_UNUSED (group_);
1643     errno = ENOTSUP;
1644     return -1;
1645 }
1646 
1647 int zmq::socket_base_t::xleave (const char *group_)
1648 {
1649     LIBZMQ_UNUSED (group_);
1650     errno = ENOTSUP;
1651     return -1;
1652 }
1653 
1654 int zmq::socket_base_t::xrecv (msg_t *)
1655 {
1656     errno = ENOTSUP;
1657     return -1;
1658 }
1659 
1660 void zmq::socket_base_t::xread_activated (pipe_t *)
1661 {
1662     zmq_assert (false);
1663 }
1664 void zmq::socket_base_t::xwrite_activated (pipe_t *)
1665 {
1666     zmq_assert (false);
1667 }
1668 
1669 void zmq::socket_base_t::xhiccuped (pipe_t *)
1670 {
1671     zmq_assert (false);
1672 }
1673 
1674 void zmq::socket_base_t::in_event ()
1675 {
1676     //  This function is invoked only once the socket is running in the context
1677     //  of the reaper thread. Process any commands from other threads/sockets
1678     //  that may be available at the moment. Ultimately, the socket will
1679     //  be destroyed.
1680     {
1681         scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1682 
1683         //  If the socket is thread safe we need to unsignal the reaper signaler
1684         if (_thread_safe)
1685             _reaper_signaler->recv ();
1686 
1687         process_commands (0, false);
1688     }
1689     check_destroy ();
1690 }
1691 
1692 void zmq::socket_base_t::out_event ()
1693 {
1694     zmq_assert (false);
1695 }
1696 
1697 void zmq::socket_base_t::timer_event (int)
1698 {
1699     zmq_assert (false);
1700 }
1701 
1702 void zmq::socket_base_t::check_destroy ()
1703 {
1704     //  If the object was already marked as destroyed, finish the deallocation.
1705     if (_destroyed) {
1706         //  Remove the socket from the reaper's poller.
1707         _poller->rm_fd (_handle);
1708 
1709         //  Remove the socket from the context.
1710         destroy_socket (this);
1711 
1712         //  Notify the reaper about the fact.
1713         send_reaped ();
1714 
1715         //  Deallocate.
1716         own_t::process_destroy ();
1717     }
1718 }
1719 
1720 void zmq::socket_base_t::read_activated (pipe_t *pipe_)
1721 {
1722     xread_activated (pipe_);
1723 }
1724 
1725 void zmq::socket_base_t::write_activated (pipe_t *pipe_)
1726 {
1727     xwrite_activated (pipe_);
1728 }
1729 
1730 void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
1731 {
1732     if (options.immediate == 1)
1733         pipe_->terminate (false);
1734     else
1735         // Notify derived sockets of the hiccup
1736         xhiccuped (pipe_);
1737 }
1738 
1739 void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1740 {
1741     //  Notify the specific socket type about the pipe termination.
1742     xpipe_terminated (pipe_);
1743 
1744     // Remove pipe from inproc pipes
1745     _inprocs.erase_pipe (pipe_);
1746 
1747     //  Remove the pipe from the list of attached pipes and confirm its
1748     //  termination if we are already shutting down.
1749     _pipes.erase (pipe_);
1750 
1751     // Remove the pipe from _endpoints (set it to NULL).
1752     const std::string &identifier = pipe_->get_endpoint_pair ().identifier ();
1753     if (!identifier.empty ()) {
1754         std::pair<endpoints_t::iterator, endpoints_t::iterator> range;
1755         range = _endpoints.equal_range (identifier);
1756 
1757         for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1758             if (it->second.second == pipe_) {
1759                 it->second.second = NULL;
1760                 break;
1761             }
1762         }
1763     }
1764 
1765     if (is_terminating ())
1766         unregister_term_ack ();
1767 }
1768 
1769 void zmq::socket_base_t::extract_flags (const msg_t *msg_)
1770 {
1771     //  Test whether routing_id flag is valid for this socket type.
1772     if (unlikely (msg_->flags () & msg_t::routing_id))
1773         zmq_assert (options.recv_routing_id);
1774 
1775     //  Remove MORE flag.
1776     _rcvmore = (msg_->flags () & msg_t::more) != 0;
1777 }
1778 
1779 int zmq::socket_base_t::monitor (const char *endpoint_,
1780                                  uint64_t events_,
1781                                  int event_version_,
1782                                  int type_)
1783 {
1784     scoped_lock_t lock (_monitor_sync);
1785 
1786     if (unlikely (_ctx_terminated)) {
1787         errno = ETERM;
1788         return -1;
1789     }
1790 
1791     //  Event version 1 supports only first 16 events.
1792     if (unlikely (event_version_ == 1 && events_ >> 16 != 0)) {
1793         errno = EINVAL;
1794         return -1;
1795     }
1796 
1797     //  Support deregistering monitoring endpoints as well
1798     if (endpoint_ == NULL) {
1799         stop_monitor ();
1800         return 0;
1801     }
1802     //  Parse endpoint_uri_ string.
1803     std::string protocol;
1804     std::string address;
1805     if (parse_uri (endpoint_, protocol, address) || check_protocol (protocol))
1806         return -1;
1807 
1808     //  Event notification only supported over inproc://
1809     if (protocol != protocol_name::inproc) {
1810         errno = EPROTONOSUPPORT;
1811         return -1;
1812     }
1813 
1814     // already monitoring. Stop previous monitor before starting new one.
1815     if (_monitor_socket != NULL) {
1816         stop_monitor (true);
1817     }
1818 
1819     // Check if the specified socket type is supported. It must be a
1820     // one-way socket types that support the SNDMORE flag.
1821     switch (type_) {
1822         case ZMQ_PAIR:
1823             break;
1824         case ZMQ_PUB:
1825             break;
1826         case ZMQ_PUSH:
1827             break;
1828         default:
1829             errno = EINVAL;
1830             return -1;
1831     }
1832 
1833     //  Register events to monitor
1834     _monitor_events = events_;
1835     options.monitor_event_version = event_version_;
1836     //  Create a monitor socket of the specified type.
1837     _monitor_socket = zmq_socket (get_ctx (), type_);
1838     if (_monitor_socket == NULL)
1839         return -1;
1840 
1841     //  Never block context termination on pending event messages
1842     int linger = 0;
1843     int rc =
1844       zmq_setsockopt (_monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1845     if (rc == -1)
1846         stop_monitor (false);
1847 
1848     //  Spawn the monitor socket endpoint
1849     rc = zmq_bind (_monitor_socket, endpoint_);
1850     if (rc == -1)
1851         stop_monitor (false);
1852     return rc;
1853 }
1854 
1855 void zmq::socket_base_t::event_connected (
1856   const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1857 {
1858     uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1859     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECTED);
1860 }
1861 
1862 void zmq::socket_base_t::event_connect_delayed (
1863   const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1864 {
1865     uint64_t values[1] = {static_cast<uint64_t> (err_)};
1866     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_DELAYED);
1867 }
1868 
1869 void zmq::socket_base_t::event_connect_retried (
1870   const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_)
1871 {
1872     uint64_t values[1] = {static_cast<uint64_t> (interval_)};
1873     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_RETRIED);
1874 }
1875 
1876 void zmq::socket_base_t::event_listening (
1877   const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1878 {
1879     uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1880     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_LISTENING);
1881 }
1882 
1883 void zmq::socket_base_t::event_bind_failed (
1884   const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1885 {
1886     uint64_t values[1] = {static_cast<uint64_t> (err_)};
1887     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_BIND_FAILED);
1888 }
1889 
1890 void zmq::socket_base_t::event_accepted (
1891   const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1892 {
1893     uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1894     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPTED);
1895 }
1896 
1897 void zmq::socket_base_t::event_accept_failed (
1898   const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1899 {
1900     uint64_t values[1] = {static_cast<uint64_t> (err_)};
1901     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPT_FAILED);
1902 }
1903 
1904 void zmq::socket_base_t::event_closed (
1905   const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1906 {
1907     uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1908     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSED);
1909 }
1910 
1911 void zmq::socket_base_t::event_close_failed (
1912   const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1913 {
1914     uint64_t values[1] = {static_cast<uint64_t> (err_)};
1915     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSE_FAILED);
1916 }
1917 
1918 void zmq::socket_base_t::event_disconnected (
1919   const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1920 {
1921     uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1922     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_DISCONNECTED);
1923 }
1924 
1925 void zmq::socket_base_t::event_handshake_failed_no_detail (
1926   const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1927 {
1928     uint64_t values[1] = {static_cast<uint64_t> (err_)};
1929     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
1930 }
1931 
1932 void zmq::socket_base_t::event_handshake_failed_protocol (
1933   const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1934 {
1935     uint64_t values[1] = {static_cast<uint64_t> (err_)};
1936     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
1937 }
1938 
1939 void zmq::socket_base_t::event_handshake_failed_auth (
1940   const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1941 {
1942     uint64_t values[1] = {static_cast<uint64_t> (err_)};
1943     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
1944 }
1945 
1946 void zmq::socket_base_t::event_handshake_succeeded (
1947   const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1948 {
1949     uint64_t values[1] = {static_cast<uint64_t> (err_)};
1950     event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
1951 }
1952 
1953 void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_,
1954                                 uint64_t values_[],
1955                                 uint64_t values_count_,
1956                                 uint64_t type_)
1957 {
1958     scoped_lock_t lock (_monitor_sync);
1959     if (_monitor_events & type_) {
1960         monitor_event (type_, values_, values_count_, endpoint_uri_pair_);
1961     }
1962 }
1963 
1964 //  Send a monitor event
1965 void zmq::socket_base_t::monitor_event (
1966   uint64_t event_,
1967   const uint64_t values_[],
1968   uint64_t values_count_,
1969   const endpoint_uri_pair_t &endpoint_uri_pair_) const
1970 {
1971     // this is a private method which is only called from
1972     // contexts where the _monitor_sync mutex has been locked before
1973 
1974     if (_monitor_socket) {
1975         zmq_msg_t msg;
1976 
1977         switch (options.monitor_event_version) {
1978             case 1: {
1979                 //  The API should not allow to activate unsupported events
1980                 zmq_assert (event_ <= std::numeric_limits<uint16_t>::max ());
1981                 //  v1 only allows one value
1982                 zmq_assert (values_count_ == 1);
1983                 zmq_assert (values_[0]
1984                             <= std::numeric_limits<uint32_t>::max ());
1985 
1986                 //  Send event and value in first frame
1987                 const uint16_t event = static_cast<uint16_t> (event_);
1988                 const uint32_t value = static_cast<uint32_t> (values_[0]);
1989                 zmq_msg_init_size (&msg, sizeof (event) + sizeof (value));
1990                 uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
1991                 //  Avoid dereferencing uint32_t on unaligned address
1992                 memcpy (data + 0, &event, sizeof (event));
1993                 memcpy (data + sizeof (event), &value, sizeof (value));
1994                 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
1995 
1996                 const std::string &endpoint_uri =
1997                   endpoint_uri_pair_.identifier ();
1998 
1999                 //  Send address in second frame
2000                 zmq_msg_init_size (&msg, endpoint_uri.size ());
2001                 memcpy (zmq_msg_data (&msg), endpoint_uri.c_str (),
2002                         endpoint_uri.size ());
2003                 zmq_msg_send (&msg, _monitor_socket, 0);
2004             } break;
2005             case 2: {
2006                 //  Send event in first frame (64bit unsigned)
2007                 zmq_msg_init_size (&msg, sizeof (event_));
2008                 memcpy (zmq_msg_data (&msg), &event_, sizeof (event_));
2009                 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2010 
2011                 //  Send number of values that will follow in second frame
2012                 zmq_msg_init_size (&msg, sizeof (values_count_));
2013                 memcpy (zmq_msg_data (&msg), &values_count_,
2014                         sizeof (values_count_));
2015                 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2016 
2017                 //  Send values in third-Nth frames (64bit unsigned)
2018                 for (uint64_t i = 0; i < values_count_; ++i) {
2019                     zmq_msg_init_size (&msg, sizeof (values_[i]));
2020                     memcpy (zmq_msg_data (&msg), &values_[i],
2021                             sizeof (values_[i]));
2022                     zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2023                 }
2024 
2025                 //  Send local endpoint URI in second-to-last frame (string)
2026                 zmq_msg_init_size (&msg, endpoint_uri_pair_.local.size ());
2027                 memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.local.c_str (),
2028                         endpoint_uri_pair_.local.size ());
2029                 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2030 
2031                 //  Send remote endpoint URI in last frame (string)
2032                 zmq_msg_init_size (&msg, endpoint_uri_pair_.remote.size ());
2033                 memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.remote.c_str (),
2034                         endpoint_uri_pair_.remote.size ());
2035                 zmq_msg_send (&msg, _monitor_socket, 0);
2036             } break;
2037         }
2038     }
2039 }
2040 
2041 void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
2042 {
2043     // this is a private method which is only called from
2044     // contexts where the _monitor_sync mutex has been locked before
2045 
2046     if (_monitor_socket) {
2047         if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
2048             && send_monitor_stopped_event_) {
2049             uint64_t values[1] = {0};
2050             monitor_event (ZMQ_EVENT_MONITOR_STOPPED, values, 1,
2051                            endpoint_uri_pair_t ());
2052         }
2053         zmq_close (_monitor_socket);
2054         _monitor_socket = NULL;
2055         _monitor_events = 0;
2056     }
2057 }
2058 
2059 bool zmq::socket_base_t::is_disconnected () const
2060 {
2061     return _disconnected;
2062 }
2063 
2064 zmq::routing_socket_base_t::routing_socket_base_t (class ctx_t *parent_,
2065                                                    uint32_t tid_,
2066                                                    int sid_) :
2067     socket_base_t (parent_, tid_, sid_)
2068 {
2069 }
2070 
2071 zmq::routing_socket_base_t::~routing_socket_base_t ()
2072 {
2073     zmq_assert (_out_pipes.empty ());
2074 }
2075 
2076 int zmq::routing_socket_base_t::xsetsockopt (int option_,
2077                                              const void *optval_,
2078                                              size_t optvallen_)
2079 {
2080     switch (option_) {
2081         case ZMQ_CONNECT_ROUTING_ID:
2082             // TODO why isn't it possible to set an empty connect_routing_id
2083             //   (which is the default value)
2084             if (optval_ && optvallen_) {
2085                 _connect_routing_id.assign (static_cast<const char *> (optval_),
2086                                             optvallen_);
2087                 return 0;
2088             }
2089             break;
2090     }
2091     errno = EINVAL;
2092     return -1;
2093 }
2094 
2095 void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_)
2096 {
2097     const out_pipes_t::iterator end = _out_pipes.end ();
2098     out_pipes_t::iterator it;
2099     for (it = _out_pipes.begin (); it != end; ++it)
2100         if (it->second.pipe == pipe_)
2101             break;
2102 
2103     zmq_assert (it != end);
2104     zmq_assert (!it->second.active);
2105     it->second.active = true;
2106 }
2107 
2108 std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
2109 {
2110     std::string res = ZMQ_MOVE (_connect_routing_id);
2111     _connect_routing_id.clear ();
2112     return res;
2113 }
2114 
2115 bool zmq::routing_socket_base_t::connect_routing_id_is_set () const
2116 {
2117     return !_connect_routing_id.empty ();
2118 }
2119 
2120 void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id_,
2121                                                pipe_t *pipe_)
2122 {
2123     //  Add the record into output pipes lookup table
2124     const out_pipe_t outpipe = {pipe_, true};
2125     const bool ok =
2126       _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id_), outpipe)
2127         .second;
2128     zmq_assert (ok);
2129 }
2130 
2131 bool zmq::routing_socket_base_t::has_out_pipe (const blob_t &routing_id_) const
2132 {
2133     return 0 != _out_pipes.count (routing_id_);
2134 }
2135 
2136 zmq::routing_socket_base_t::out_pipe_t *
2137 zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_)
2138 {
2139     // TODO we could probably avoid constructor a temporary blob_t to call this function
2140     out_pipes_t::iterator it = _out_pipes.find (routing_id_);
2141     return it == _out_pipes.end () ? NULL : &it->second;
2142 }
2143 
2144 const zmq::routing_socket_base_t::out_pipe_t *
2145 zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_) const
2146 {
2147     // TODO we could probably avoid constructor a temporary blob_t to call this function
2148     const out_pipes_t::const_iterator it = _out_pipes.find (routing_id_);
2149     return it == _out_pipes.end () ? NULL : &it->second;
2150 }
2151 
2152 void zmq::routing_socket_base_t::erase_out_pipe (const pipe_t *pipe_)
2153 {
2154     const size_t erased = _out_pipes.erase (pipe_->get_routing_id ());
2155     zmq_assert (erased);
2156 }
2157 
2158 zmq::routing_socket_base_t::out_pipe_t
2159 zmq::routing_socket_base_t::try_erase_out_pipe (const blob_t &routing_id_)
2160 {
2161     const out_pipes_t::iterator it = _out_pipes.find (routing_id_);
2162     out_pipe_t res = {NULL, false};
2163     if (it != _out_pipes.end ()) {
2164         res = it->second;
2165         _out_pipes.erase (it);
2166     }
2167     return res;
2168 }
2169