1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "session_base.hpp"
33 #include "i_engine.hpp"
34 #include "err.hpp"
35 #include "pipe.hpp"
36 #include "likely.hpp"
37 #include "tcp_connecter.hpp"
38 #include "ws_connecter.hpp"
39 #include "ipc_connecter.hpp"
40 #include "tipc_connecter.hpp"
41 #include "socks_connecter.hpp"
42 #include "vmci_connecter.hpp"
43 #include "pgm_sender.hpp"
44 #include "pgm_receiver.hpp"
45 #include "address.hpp"
46 #include "norm_engine.hpp"
47 #include "udp_engine.hpp"
48 
49 #include "ctx.hpp"
50 #include "req.hpp"
51 #include "radio.hpp"
52 #include "dish.hpp"
53 
create(class io_thread_t * io_thread_,bool active_,class socket_base_t * socket_,const options_t & options_,address_t * addr_)54 zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
55                                                   bool active_,
56                                                   class socket_base_t *socket_,
57                                                   const options_t &options_,
58                                                   address_t *addr_)
59 {
60     session_base_t *s = NULL;
61     switch (options_.type) {
62         case ZMQ_REQ:
63             s = new (std::nothrow)
64               req_session_t (io_thread_, active_, socket_, options_, addr_);
65             break;
66         case ZMQ_RADIO:
67             s = new (std::nothrow)
68               radio_session_t (io_thread_, active_, socket_, options_, addr_);
69             break;
70         case ZMQ_DISH:
71             s = new (std::nothrow)
72               dish_session_t (io_thread_, active_, socket_, options_, addr_);
73             break;
74         case ZMQ_DEALER:
75         case ZMQ_REP:
76         case ZMQ_ROUTER:
77         case ZMQ_PUB:
78         case ZMQ_XPUB:
79         case ZMQ_SUB:
80         case ZMQ_XSUB:
81         case ZMQ_PUSH:
82         case ZMQ_PULL:
83         case ZMQ_PAIR:
84         case ZMQ_STREAM:
85         case ZMQ_SERVER:
86         case ZMQ_CLIENT:
87         case ZMQ_GATHER:
88         case ZMQ_SCATTER:
89         case ZMQ_DGRAM:
90         case ZMQ_PEER:
91         case ZMQ_CHANNEL:
92 #ifdef ZMQ_BUILD_DRAFT_API
93             if (options_.can_send_hello_msg && options_.hello_msg.size () > 0)
94                 s = new (std::nothrow) hello_msg_session_t (
95                   io_thread_, active_, socket_, options_, addr_);
96             else
97                 s = new (std::nothrow) session_base_t (
98                   io_thread_, active_, socket_, options_, addr_);
99 
100             break;
101 #else
102             s = new (std::nothrow)
103               session_base_t (io_thread_, active_, socket_, options_, addr_);
104             break;
105 #endif
106 
107         default:
108             errno = EINVAL;
109             return NULL;
110     }
111     alloc_assert (s);
112     return s;
113 }
114 
session_base_t(class io_thread_t * io_thread_,bool active_,class socket_base_t * socket_,const options_t & options_,address_t * addr_)115 zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
116                                      bool active_,
117                                      class socket_base_t *socket_,
118                                      const options_t &options_,
119                                      address_t *addr_) :
120     own_t (io_thread_, options_),
121     io_object_t (io_thread_),
122     _active (active_),
123     _pipe (NULL),
124     _zap_pipe (NULL),
125     _incomplete_in (false),
126     _pending (false),
127     _engine (NULL),
128     _socket (socket_),
129     _io_thread (io_thread_),
130     _has_linger_timer (false),
131     _addr (addr_)
132 #ifdef ZMQ_HAVE_WSS
133     ,
134     _wss_hostname (options_.wss_hostname)
135 #endif
136 {
137 }
138 
get_endpoint() const139 const zmq::endpoint_uri_pair_t &zmq::session_base_t::get_endpoint () const
140 {
141     return _engine->get_endpoint ();
142 }
143 
~session_base_t()144 zmq::session_base_t::~session_base_t ()
145 {
146     zmq_assert (!_pipe);
147     zmq_assert (!_zap_pipe);
148 
149     //  If there's still a pending linger timer, remove it.
150     if (_has_linger_timer) {
151         cancel_timer (linger_timer_id);
152         _has_linger_timer = false;
153     }
154 
155     //  Close the engine.
156     if (_engine)
157         _engine->terminate ();
158 
159     LIBZMQ_DELETE (_addr);
160 }
161 
attach_pipe(pipe_t * pipe_)162 void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
163 {
164     zmq_assert (!is_terminating ());
165     zmq_assert (!_pipe);
166     zmq_assert (pipe_);
167     _pipe = pipe_;
168     _pipe->set_event_sink (this);
169 }
170 
pull_msg(msg_t * msg_)171 int zmq::session_base_t::pull_msg (msg_t *msg_)
172 {
173     if (!_pipe || !_pipe->read (msg_)) {
174         errno = EAGAIN;
175         return -1;
176     }
177 
178     _incomplete_in = (msg_->flags () & msg_t::more) != 0;
179 
180     return 0;
181 }
182 
push_msg(msg_t * msg_)183 int zmq::session_base_t::push_msg (msg_t *msg_)
184 {
185     //  pass subscribe/cancel to the sockets
186     if ((msg_->flags () & msg_t::command) && !msg_->is_subscribe ()
187         && !msg_->is_cancel ())
188         return 0;
189     if (_pipe && _pipe->write (msg_)) {
190         const int rc = msg_->init ();
191         errno_assert (rc == 0);
192         return 0;
193     }
194 
195     errno = EAGAIN;
196     return -1;
197 }
198 
read_zap_msg(msg_t * msg_)199 int zmq::session_base_t::read_zap_msg (msg_t *msg_)
200 {
201     if (_zap_pipe == NULL) {
202         errno = ENOTCONN;
203         return -1;
204     }
205 
206     if (!_zap_pipe->read (msg_)) {
207         errno = EAGAIN;
208         return -1;
209     }
210 
211     return 0;
212 }
213 
write_zap_msg(msg_t * msg_)214 int zmq::session_base_t::write_zap_msg (msg_t *msg_)
215 {
216     if (_zap_pipe == NULL || !_zap_pipe->write (msg_)) {
217         errno = ENOTCONN;
218         return -1;
219     }
220 
221     if ((msg_->flags () & msg_t::more) == 0)
222         _zap_pipe->flush ();
223 
224     const int rc = msg_->init ();
225     errno_assert (rc == 0);
226     return 0;
227 }
228 
reset()229 void zmq::session_base_t::reset ()
230 {
231 }
232 
flush()233 void zmq::session_base_t::flush ()
234 {
235     if (_pipe)
236         _pipe->flush ();
237 }
238 
rollback()239 void zmq::session_base_t::rollback ()
240 {
241     if (_pipe)
242         _pipe->rollback ();
243 }
244 
clean_pipes()245 void zmq::session_base_t::clean_pipes ()
246 {
247     zmq_assert (_pipe != NULL);
248 
249     //  Get rid of half-processed messages in the out pipe. Flush any
250     //  unflushed messages upstream.
251     _pipe->rollback ();
252     _pipe->flush ();
253 
254     //  Remove any half-read message from the in pipe.
255     while (_incomplete_in) {
256         msg_t msg;
257         int rc = msg.init ();
258         errno_assert (rc == 0);
259         rc = pull_msg (&msg);
260         errno_assert (rc == 0);
261         rc = msg.close ();
262         errno_assert (rc == 0);
263     }
264 }
265 
pipe_terminated(pipe_t * pipe_)266 void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
267 {
268     // Drop the reference to the deallocated pipe if required.
269     zmq_assert (pipe_ == _pipe || pipe_ == _zap_pipe
270                 || _terminating_pipes.count (pipe_) == 1);
271 
272     if (pipe_ == _pipe) {
273         // If this is our current pipe, remove it
274         _pipe = NULL;
275         if (_has_linger_timer) {
276             cancel_timer (linger_timer_id);
277             _has_linger_timer = false;
278         }
279     } else if (pipe_ == _zap_pipe)
280         _zap_pipe = NULL;
281     else
282         // Remove the pipe from the detached pipes set
283         _terminating_pipes.erase (pipe_);
284 
285     if (!is_terminating () && options.raw_socket) {
286         if (_engine) {
287             _engine->terminate ();
288             _engine = NULL;
289         }
290         terminate ();
291     }
292 
293     //  If we are waiting for pending messages to be sent, at this point
294     //  we are sure that there will be no more messages and we can proceed
295     //  with termination safely.
296     if (_pending && !_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
297         _pending = false;
298         own_t::process_term (0);
299     }
300 }
301 
read_activated(pipe_t * pipe_)302 void zmq::session_base_t::read_activated (pipe_t *pipe_)
303 {
304     // Skip activating if we're detaching this pipe
305     if (unlikely (pipe_ != _pipe && pipe_ != _zap_pipe)) {
306         zmq_assert (_terminating_pipes.count (pipe_) == 1);
307         return;
308     }
309 
310     if (unlikely (_engine == NULL)) {
311         if (_pipe)
312             _pipe->check_read ();
313         return;
314     }
315 
316     if (likely (pipe_ == _pipe))
317         _engine->restart_output ();
318     else {
319         // i.e. pipe_ == zap_pipe
320         _engine->zap_msg_available ();
321     }
322 }
323 
write_activated(pipe_t * pipe_)324 void zmq::session_base_t::write_activated (pipe_t *pipe_)
325 {
326     // Skip activating if we're detaching this pipe
327     if (_pipe != pipe_) {
328         zmq_assert (_terminating_pipes.count (pipe_) == 1);
329         return;
330     }
331 
332     if (_engine)
333         _engine->restart_input ();
334 }
335 
hiccuped(pipe_t *)336 void zmq::session_base_t::hiccuped (pipe_t *)
337 {
338     //  Hiccups are always sent from session to socket, not the other
339     //  way round.
340     zmq_assert (false);
341 }
342 
get_socket() const343 zmq::socket_base_t *zmq::session_base_t::get_socket () const
344 {
345     return _socket;
346 }
347 
process_plug()348 void zmq::session_base_t::process_plug ()
349 {
350     if (_active)
351         start_connecting (false);
352 }
353 
354 //  This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP
355 //  is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context)
356 //  or it aborts on any other error. In other words, either ZAP is not
357 //  configured or if it is configured it MUST be configured correctly and it
358 //  MUST work, otherwise authentication cannot be guaranteed and it would be a
359 //  security flaw.
zap_connect()360 int zmq::session_base_t::zap_connect ()
361 {
362     if (_zap_pipe != NULL)
363         return 0;
364 
365     endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
366     if (peer.socket == NULL) {
367         errno = ECONNREFUSED;
368         return -1;
369     }
370     zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER
371                 || peer.options.type == ZMQ_SERVER);
372 
373     //  Create a bi-directional pipe that will connect
374     //  session with zap socket.
375     object_t *parents[2] = {this, peer.socket};
376     pipe_t *new_pipes[2] = {NULL, NULL};
377     int hwms[2] = {0, 0};
378     bool conflates[2] = {false, false};
379     int rc = pipepair (parents, new_pipes, hwms, conflates);
380     errno_assert (rc == 0);
381 
382     //  Attach local end of the pipe to this socket object.
383     _zap_pipe = new_pipes[0];
384     _zap_pipe->set_nodelay ();
385     _zap_pipe->set_event_sink (this);
386 
387     send_bind (peer.socket, new_pipes[1], false);
388 
389     //  Send empty routing id if required by the peer.
390     if (peer.options.recv_routing_id) {
391         msg_t id;
392         rc = id.init ();
393         errno_assert (rc == 0);
394         id.set_flags (msg_t::routing_id);
395         bool ok = _zap_pipe->write (&id);
396         zmq_assert (ok);
397         _zap_pipe->flush ();
398     }
399 
400     return 0;
401 }
402 
zap_enabled() const403 bool zmq::session_base_t::zap_enabled () const
404 {
405     return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
406 }
407 
process_attach(i_engine * engine_)408 void zmq::session_base_t::process_attach (i_engine *engine_)
409 {
410     zmq_assert (engine_ != NULL);
411     zmq_assert (!_engine);
412     _engine = engine_;
413 
414     if (!engine_->has_handshake_stage ())
415         engine_ready ();
416 
417     //  Plug in the engine.
418     _engine->plug (_io_thread, this);
419 }
420 
engine_ready()421 void zmq::session_base_t::engine_ready ()
422 {
423     //  Create the pipe if it does not exist yet.
424     if (!_pipe && !is_terminating ()) {
425         object_t *parents[2] = {this, _socket};
426         pipe_t *pipes[2] = {NULL, NULL};
427 
428         const bool conflate = get_effective_conflate_option (options);
429 
430         int hwms[2] = {conflate ? -1 : options.rcvhwm,
431                        conflate ? -1 : options.sndhwm};
432         bool conflates[2] = {conflate, conflate};
433         const int rc = pipepair (parents, pipes, hwms, conflates);
434         errno_assert (rc == 0);
435 
436         //  Plug the local end of the pipe.
437         pipes[0]->set_event_sink (this);
438 
439         //  Remember the local end of the pipe.
440         zmq_assert (!_pipe);
441         _pipe = pipes[0];
442 
443         //  The endpoints strings are not set on bind, set them here so that
444         //  events can use them.
445         pipes[0]->set_endpoint_pair (_engine->get_endpoint ());
446         pipes[1]->set_endpoint_pair (_engine->get_endpoint ());
447 
448         //  Ask socket to plug into the remote end of the pipe.
449         send_bind (_socket, pipes[1]);
450     }
451 }
452 
engine_error(bool handshaked_,zmq::i_engine::error_reason_t reason_)453 void zmq::session_base_t::engine_error (bool handshaked_,
454                                         zmq::i_engine::error_reason_t reason_)
455 {
456     //  Engine is dead. Let's forget about it.
457     _engine = NULL;
458 
459     //  Remove any half-done messages from the pipes.
460     if (_pipe) {
461         clean_pipes ();
462 
463 #ifdef ZMQ_BUILD_DRAFT_API
464         //  Only send disconnect message if socket was accepted and handshake was completed
465         if (!_active && handshaked_ && options.can_recv_disconnect_msg
466             && !options.disconnect_msg.empty ()) {
467             _pipe->set_disconnect_msg (options.disconnect_msg);
468             _pipe->send_disconnect_msg ();
469         }
470 #endif
471     }
472 
473     zmq_assert (reason_ == i_engine::connection_error
474                 || reason_ == i_engine::timeout_error
475                 || reason_ == i_engine::protocol_error);
476 
477     switch (reason_) {
478         case i_engine::timeout_error:
479             /* FALLTHROUGH */
480         case i_engine::connection_error:
481             if (_active) {
482                 reconnect ();
483                 break;
484             }
485 
486         case i_engine::protocol_error:
487             if (_pending) {
488                 if (_pipe)
489                     _pipe->terminate (false);
490                 if (_zap_pipe)
491                     _zap_pipe->terminate (false);
492             } else {
493                 terminate ();
494             }
495             break;
496     }
497 
498     //  Just in case there's only a delimiter in the pipe.
499     if (_pipe)
500         _pipe->check_read ();
501 
502     if (_zap_pipe)
503         _zap_pipe->check_read ();
504 }
505 
process_term(int linger_)506 void zmq::session_base_t::process_term (int linger_)
507 {
508     zmq_assert (!_pending);
509 
510     //  If the termination of the pipe happens before the term command is
511     //  delivered there's nothing much to do. We can proceed with the
512     //  standard termination immediately.
513     if (!_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
514         own_t::process_term (0);
515         return;
516     }
517 
518     _pending = true;
519 
520     if (_pipe != NULL) {
521         //  If there's finite linger value, delay the termination.
522         //  If linger is infinite (negative) we don't even have to set
523         //  the timer.
524         if (linger_ > 0) {
525             zmq_assert (!_has_linger_timer);
526             add_timer (linger_, linger_timer_id);
527             _has_linger_timer = true;
528         }
529 
530         //  Start pipe termination process. Delay the termination till all messages
531         //  are processed in case the linger time is non-zero.
532         _pipe->terminate (linger_ != 0);
533 
534         //  TODO: Should this go into pipe_t::terminate ?
535         //  In case there's no engine and there's only delimiter in the
536         //  pipe it wouldn't be ever read. Thus we check for it explicitly.
537         if (!_engine)
538             _pipe->check_read ();
539     }
540 
541     if (_zap_pipe != NULL)
542         _zap_pipe->terminate (false);
543 }
544 
timer_event(int id_)545 void zmq::session_base_t::timer_event (int id_)
546 {
547     //  Linger period expired. We can proceed with termination even though
548     //  there are still pending messages to be sent.
549     zmq_assert (id_ == linger_timer_id);
550     _has_linger_timer = false;
551 
552     //  Ask pipe to terminate even though there may be pending messages in it.
553     zmq_assert (_pipe);
554     _pipe->terminate (false);
555 }
556 
process_conn_failed()557 void zmq::session_base_t::process_conn_failed ()
558 {
559     std::string *ep = new (std::string);
560     _addr->to_string (*ep);
561     send_term_endpoint (_socket, ep);
562 }
563 
reconnect()564 void zmq::session_base_t::reconnect ()
565 {
566     //  For delayed connect situations, terminate the pipe
567     //  and reestablish later on
568     if (_pipe && options.immediate == 1
569 #ifdef ZMQ_HAVE_OPENPGM
570         && _addr->protocol != protocol_name::pgm
571         && _addr->protocol != protocol_name::epgm
572 #endif
573 #ifdef ZMQ_HAVE_NORM
574         && _addr->protocol != protocol_name::norm
575 #endif
576         && _addr->protocol != protocol_name::udp) {
577         _pipe->hiccup ();
578         _pipe->terminate (false);
579         _terminating_pipes.insert (_pipe);
580         _pipe = NULL;
581 
582         if (_has_linger_timer) {
583             cancel_timer (linger_timer_id);
584             _has_linger_timer = false;
585         }
586     }
587 
588     reset ();
589 
590     //  Reconnect.
591     if (options.reconnect_ivl > 0)
592         start_connecting (true);
593     else {
594         std::string *ep = new (std::string);
595         _addr->to_string (*ep);
596         send_term_endpoint (_socket, ep);
597     }
598 
599     //  For subscriber sockets we hiccup the inbound pipe, which will cause
600     //  the socket object to resend all the subscriptions.
601     if (_pipe
602         && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB
603             || options.type == ZMQ_DISH))
604         _pipe->hiccup ();
605 }
606 
start_connecting(bool wait_)607 void zmq::session_base_t::start_connecting (bool wait_)
608 {
609     zmq_assert (_active);
610 
611     //  Choose I/O thread to run connecter in. Given that we are already
612     //  running in an I/O thread, there must be at least one available.
613     io_thread_t *io_thread = choose_io_thread (options.affinity);
614     zmq_assert (io_thread);
615 
616     //  Create the connecter object.
617     own_t *connecter = NULL;
618     if (_addr->protocol == protocol_name::tcp) {
619         if (!options.socks_proxy_address.empty ()) {
620             address_t *proxy_address = new (std::nothrow)
621               address_t (protocol_name::tcp, options.socks_proxy_address,
622                          this->get_ctx ());
623             alloc_assert (proxy_address);
624             connecter = new (std::nothrow) socks_connecter_t (
625               io_thread, this, options, _addr, proxy_address, wait_);
626             alloc_assert (connecter);
627             if (!options.socks_proxy_username.empty ()) {
628                 reinterpret_cast<socks_connecter_t *> (connecter)
629                   ->set_auth_method_basic (options.socks_proxy_username,
630                                            options.socks_proxy_password);
631             }
632         } else {
633             connecter = new (std::nothrow)
634               tcp_connecter_t (io_thread, this, options, _addr, wait_);
635         }
636     }
637 #if defined ZMQ_HAVE_IPC
638     else if (_addr->protocol == protocol_name::ipc) {
639         connecter = new (std::nothrow)
640           ipc_connecter_t (io_thread, this, options, _addr, wait_);
641     }
642 #endif
643 #if defined ZMQ_HAVE_TIPC
644     else if (_addr->protocol == protocol_name::tipc) {
645         connecter = new (std::nothrow)
646           tipc_connecter_t (io_thread, this, options, _addr, wait_);
647     }
648 #endif
649 #if defined ZMQ_HAVE_VMCI
650     else if (_addr->protocol == protocol_name::vmci) {
651         connecter = new (std::nothrow)
652           vmci_connecter_t (io_thread, this, options, _addr, wait_);
653     }
654 #endif
655 #if defined ZMQ_HAVE_WS
656     else if (_addr->protocol == protocol_name::ws) {
657         connecter = new (std::nothrow) ws_connecter_t (
658           io_thread, this, options, _addr, wait_, false, std::string ());
659     }
660 #endif
661 #if defined ZMQ_HAVE_WSS
662     else if (_addr->protocol == protocol_name::wss) {
663         connecter = new (std::nothrow) ws_connecter_t (
664           io_thread, this, options, _addr, wait_, true, _wss_hostname);
665     }
666 #endif
667     if (connecter != NULL) {
668         alloc_assert (connecter);
669         launch_child (connecter);
670         return;
671     }
672 
673     if (_addr->protocol == protocol_name::udp) {
674         zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
675                     || options.type == ZMQ_DGRAM);
676 
677         udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
678         alloc_assert (engine);
679 
680         bool recv = false;
681         bool send = false;
682 
683         if (options.type == ZMQ_RADIO) {
684             send = true;
685             recv = false;
686         } else if (options.type == ZMQ_DISH) {
687             send = false;
688             recv = true;
689         } else if (options.type == ZMQ_DGRAM) {
690             send = true;
691             recv = true;
692         }
693 
694         int rc = engine->init (_addr, send, recv);
695         errno_assert (rc == 0);
696 
697         send_attach (this, engine);
698 
699         return;
700     }
701 
702 #ifdef ZMQ_HAVE_OPENPGM
703 
704     //  Both PGM and EPGM transports are using the same infrastructure.
705     if (_addr->protocol == "pgm" || _addr->protocol == "epgm") {
706         zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
707                     || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
708 
709         //  For EPGM transport with UDP encapsulation of PGM is used.
710         bool const udp_encapsulation = _addr->protocol == "epgm";
711 
712         //  At this point we'll create message pipes to the session straight
713         //  away. There's no point in delaying it as no concept of 'connect'
714         //  exists with PGM anyway.
715         if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
716             //  PGM sender.
717             pgm_sender_t *pgm_sender =
718               new (std::nothrow) pgm_sender_t (io_thread, options);
719             alloc_assert (pgm_sender);
720 
721             int rc =
722               pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
723             errno_assert (rc == 0);
724 
725             send_attach (this, pgm_sender);
726         } else {
727             //  PGM receiver.
728             pgm_receiver_t *pgm_receiver =
729               new (std::nothrow) pgm_receiver_t (io_thread, options);
730             alloc_assert (pgm_receiver);
731 
732             int rc =
733               pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
734             errno_assert (rc == 0);
735 
736             send_attach (this, pgm_receiver);
737         }
738 
739         return;
740     }
741 #endif
742 
743 #ifdef ZMQ_HAVE_NORM
744     if (_addr->protocol == "norm") {
745         //  At this point we'll create message pipes to the session straight
746         //  away. There's no point in delaying it as no concept of 'connect'
747         //  exists with NORM anyway.
748         if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
749             //  NORM sender.
750             norm_engine_t *norm_sender =
751               new (std::nothrow) norm_engine_t (io_thread, options);
752             alloc_assert (norm_sender);
753 
754             int rc = norm_sender->init (_addr->address.c_str (), true, false);
755             errno_assert (rc == 0);
756 
757             send_attach (this, norm_sender);
758         } else { // ZMQ_SUB or ZMQ_XSUB
759 
760             //  NORM receiver.
761             norm_engine_t *norm_receiver =
762               new (std::nothrow) norm_engine_t (io_thread, options);
763             alloc_assert (norm_receiver);
764 
765             int rc = norm_receiver->init (_addr->address.c_str (), false, true);
766             errno_assert (rc == 0);
767 
768             send_attach (this, norm_receiver);
769         }
770         return;
771     }
772 #endif // ZMQ_HAVE_NORM
773 
774     zmq_assert (false);
775 }
776 
hello_msg_session_t(io_thread_t * io_thread_,bool connect_,socket_base_t * socket_,const options_t & options_,address_t * addr_)777 zmq::hello_msg_session_t::hello_msg_session_t (io_thread_t *io_thread_,
778                                                bool connect_,
779                                                socket_base_t *socket_,
780                                                const options_t &options_,
781                                                address_t *addr_) :
782     session_base_t (io_thread_, connect_, socket_, options_, addr_),
783     _new_pipe (true)
784 {
785 }
786 
~hello_msg_session_t()787 zmq::hello_msg_session_t::~hello_msg_session_t ()
788 {
789 }
790 
791 
pull_msg(msg_t * msg_)792 int zmq::hello_msg_session_t::pull_msg (msg_t *msg_)
793 {
794     if (_new_pipe) {
795         _new_pipe = false;
796 
797         const int rc =
798           msg_->init_buffer (&options.hello_msg[0], options.hello_msg.size ());
799         errno_assert (rc == 0);
800 
801         return 0;
802     }
803 
804     return session_base_t::pull_msg (msg_);
805 }
806 
reset()807 void zmq::hello_msg_session_t::reset ()
808 {
809     session_base_t::reset ();
810     _new_pipe = true;
811 }
812