1 /*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "session_base.hpp"
33 #include "i_engine.hpp"
34 #include "err.hpp"
35 #include "pipe.hpp"
36 #include "likely.hpp"
37 #include "tcp_connecter.hpp"
38 #include "ws_connecter.hpp"
39 #include "ipc_connecter.hpp"
40 #include "tipc_connecter.hpp"
41 #include "socks_connecter.hpp"
42 #include "vmci_connecter.hpp"
43 #include "pgm_sender.hpp"
44 #include "pgm_receiver.hpp"
45 #include "address.hpp"
46 #include "norm_engine.hpp"
47 #include "udp_engine.hpp"
48
49 #include "ctx.hpp"
50 #include "req.hpp"
51 #include "radio.hpp"
52 #include "dish.hpp"
53
create(class io_thread_t * io_thread_,bool active_,class socket_base_t * socket_,const options_t & options_,address_t * addr_)54 zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
55 bool active_,
56 class socket_base_t *socket_,
57 const options_t &options_,
58 address_t *addr_)
59 {
60 session_base_t *s = NULL;
61 switch (options_.type) {
62 case ZMQ_REQ:
63 s = new (std::nothrow)
64 req_session_t (io_thread_, active_, socket_, options_, addr_);
65 break;
66 case ZMQ_RADIO:
67 s = new (std::nothrow)
68 radio_session_t (io_thread_, active_, socket_, options_, addr_);
69 break;
70 case ZMQ_DISH:
71 s = new (std::nothrow)
72 dish_session_t (io_thread_, active_, socket_, options_, addr_);
73 break;
74 case ZMQ_DEALER:
75 case ZMQ_REP:
76 case ZMQ_ROUTER:
77 case ZMQ_PUB:
78 case ZMQ_XPUB:
79 case ZMQ_SUB:
80 case ZMQ_XSUB:
81 case ZMQ_PUSH:
82 case ZMQ_PULL:
83 case ZMQ_PAIR:
84 case ZMQ_STREAM:
85 case ZMQ_SERVER:
86 case ZMQ_CLIENT:
87 case ZMQ_GATHER:
88 case ZMQ_SCATTER:
89 case ZMQ_DGRAM:
90 case ZMQ_PEER:
91 case ZMQ_CHANNEL:
92 #ifdef ZMQ_BUILD_DRAFT_API
93 if (options_.can_send_hello_msg && options_.hello_msg.size () > 0)
94 s = new (std::nothrow) hello_msg_session_t (
95 io_thread_, active_, socket_, options_, addr_);
96 else
97 s = new (std::nothrow) session_base_t (
98 io_thread_, active_, socket_, options_, addr_);
99
100 break;
101 #else
102 s = new (std::nothrow)
103 session_base_t (io_thread_, active_, socket_, options_, addr_);
104 break;
105 #endif
106
107 default:
108 errno = EINVAL;
109 return NULL;
110 }
111 alloc_assert (s);
112 return s;
113 }
114
session_base_t(class io_thread_t * io_thread_,bool active_,class socket_base_t * socket_,const options_t & options_,address_t * addr_)115 zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
116 bool active_,
117 class socket_base_t *socket_,
118 const options_t &options_,
119 address_t *addr_) :
120 own_t (io_thread_, options_),
121 io_object_t (io_thread_),
122 _active (active_),
123 _pipe (NULL),
124 _zap_pipe (NULL),
125 _incomplete_in (false),
126 _pending (false),
127 _engine (NULL),
128 _socket (socket_),
129 _io_thread (io_thread_),
130 _has_linger_timer (false),
131 _addr (addr_)
132 #ifdef ZMQ_HAVE_WSS
133 ,
134 _wss_hostname (options_.wss_hostname)
135 #endif
136 {
137 }
138
get_endpoint() const139 const zmq::endpoint_uri_pair_t &zmq::session_base_t::get_endpoint () const
140 {
141 return _engine->get_endpoint ();
142 }
143
~session_base_t()144 zmq::session_base_t::~session_base_t ()
145 {
146 zmq_assert (!_pipe);
147 zmq_assert (!_zap_pipe);
148
149 // If there's still a pending linger timer, remove it.
150 if (_has_linger_timer) {
151 cancel_timer (linger_timer_id);
152 _has_linger_timer = false;
153 }
154
155 // Close the engine.
156 if (_engine)
157 _engine->terminate ();
158
159 LIBZMQ_DELETE (_addr);
160 }
161
attach_pipe(pipe_t * pipe_)162 void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
163 {
164 zmq_assert (!is_terminating ());
165 zmq_assert (!_pipe);
166 zmq_assert (pipe_);
167 _pipe = pipe_;
168 _pipe->set_event_sink (this);
169 }
170
pull_msg(msg_t * msg_)171 int zmq::session_base_t::pull_msg (msg_t *msg_)
172 {
173 if (!_pipe || !_pipe->read (msg_)) {
174 errno = EAGAIN;
175 return -1;
176 }
177
178 _incomplete_in = (msg_->flags () & msg_t::more) != 0;
179
180 return 0;
181 }
182
push_msg(msg_t * msg_)183 int zmq::session_base_t::push_msg (msg_t *msg_)
184 {
185 // pass subscribe/cancel to the sockets
186 if ((msg_->flags () & msg_t::command) && !msg_->is_subscribe ()
187 && !msg_->is_cancel ())
188 return 0;
189 if (_pipe && _pipe->write (msg_)) {
190 const int rc = msg_->init ();
191 errno_assert (rc == 0);
192 return 0;
193 }
194
195 errno = EAGAIN;
196 return -1;
197 }
198
read_zap_msg(msg_t * msg_)199 int zmq::session_base_t::read_zap_msg (msg_t *msg_)
200 {
201 if (_zap_pipe == NULL) {
202 errno = ENOTCONN;
203 return -1;
204 }
205
206 if (!_zap_pipe->read (msg_)) {
207 errno = EAGAIN;
208 return -1;
209 }
210
211 return 0;
212 }
213
write_zap_msg(msg_t * msg_)214 int zmq::session_base_t::write_zap_msg (msg_t *msg_)
215 {
216 if (_zap_pipe == NULL || !_zap_pipe->write (msg_)) {
217 errno = ENOTCONN;
218 return -1;
219 }
220
221 if ((msg_->flags () & msg_t::more) == 0)
222 _zap_pipe->flush ();
223
224 const int rc = msg_->init ();
225 errno_assert (rc == 0);
226 return 0;
227 }
228
reset()229 void zmq::session_base_t::reset ()
230 {
231 }
232
flush()233 void zmq::session_base_t::flush ()
234 {
235 if (_pipe)
236 _pipe->flush ();
237 }
238
rollback()239 void zmq::session_base_t::rollback ()
240 {
241 if (_pipe)
242 _pipe->rollback ();
243 }
244
clean_pipes()245 void zmq::session_base_t::clean_pipes ()
246 {
247 zmq_assert (_pipe != NULL);
248
249 // Get rid of half-processed messages in the out pipe. Flush any
250 // unflushed messages upstream.
251 _pipe->rollback ();
252 _pipe->flush ();
253
254 // Remove any half-read message from the in pipe.
255 while (_incomplete_in) {
256 msg_t msg;
257 int rc = msg.init ();
258 errno_assert (rc == 0);
259 rc = pull_msg (&msg);
260 errno_assert (rc == 0);
261 rc = msg.close ();
262 errno_assert (rc == 0);
263 }
264 }
265
pipe_terminated(pipe_t * pipe_)266 void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
267 {
268 // Drop the reference to the deallocated pipe if required.
269 zmq_assert (pipe_ == _pipe || pipe_ == _zap_pipe
270 || _terminating_pipes.count (pipe_) == 1);
271
272 if (pipe_ == _pipe) {
273 // If this is our current pipe, remove it
274 _pipe = NULL;
275 if (_has_linger_timer) {
276 cancel_timer (linger_timer_id);
277 _has_linger_timer = false;
278 }
279 } else if (pipe_ == _zap_pipe)
280 _zap_pipe = NULL;
281 else
282 // Remove the pipe from the detached pipes set
283 _terminating_pipes.erase (pipe_);
284
285 if (!is_terminating () && options.raw_socket) {
286 if (_engine) {
287 _engine->terminate ();
288 _engine = NULL;
289 }
290 terminate ();
291 }
292
293 // If we are waiting for pending messages to be sent, at this point
294 // we are sure that there will be no more messages and we can proceed
295 // with termination safely.
296 if (_pending && !_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
297 _pending = false;
298 own_t::process_term (0);
299 }
300 }
301
read_activated(pipe_t * pipe_)302 void zmq::session_base_t::read_activated (pipe_t *pipe_)
303 {
304 // Skip activating if we're detaching this pipe
305 if (unlikely (pipe_ != _pipe && pipe_ != _zap_pipe)) {
306 zmq_assert (_terminating_pipes.count (pipe_) == 1);
307 return;
308 }
309
310 if (unlikely (_engine == NULL)) {
311 if (_pipe)
312 _pipe->check_read ();
313 return;
314 }
315
316 if (likely (pipe_ == _pipe))
317 _engine->restart_output ();
318 else {
319 // i.e. pipe_ == zap_pipe
320 _engine->zap_msg_available ();
321 }
322 }
323
write_activated(pipe_t * pipe_)324 void zmq::session_base_t::write_activated (pipe_t *pipe_)
325 {
326 // Skip activating if we're detaching this pipe
327 if (_pipe != pipe_) {
328 zmq_assert (_terminating_pipes.count (pipe_) == 1);
329 return;
330 }
331
332 if (_engine)
333 _engine->restart_input ();
334 }
335
hiccuped(pipe_t *)336 void zmq::session_base_t::hiccuped (pipe_t *)
337 {
338 // Hiccups are always sent from session to socket, not the other
339 // way round.
340 zmq_assert (false);
341 }
342
get_socket() const343 zmq::socket_base_t *zmq::session_base_t::get_socket () const
344 {
345 return _socket;
346 }
347
process_plug()348 void zmq::session_base_t::process_plug ()
349 {
350 if (_active)
351 start_connecting (false);
352 }
353
354 // This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP
355 // is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context)
356 // or it aborts on any other error. In other words, either ZAP is not
357 // configured or if it is configured it MUST be configured correctly and it
358 // MUST work, otherwise authentication cannot be guaranteed and it would be a
359 // security flaw.
zap_connect()360 int zmq::session_base_t::zap_connect ()
361 {
362 if (_zap_pipe != NULL)
363 return 0;
364
365 endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
366 if (peer.socket == NULL) {
367 errno = ECONNREFUSED;
368 return -1;
369 }
370 zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER
371 || peer.options.type == ZMQ_SERVER);
372
373 // Create a bi-directional pipe that will connect
374 // session with zap socket.
375 object_t *parents[2] = {this, peer.socket};
376 pipe_t *new_pipes[2] = {NULL, NULL};
377 int hwms[2] = {0, 0};
378 bool conflates[2] = {false, false};
379 int rc = pipepair (parents, new_pipes, hwms, conflates);
380 errno_assert (rc == 0);
381
382 // Attach local end of the pipe to this socket object.
383 _zap_pipe = new_pipes[0];
384 _zap_pipe->set_nodelay ();
385 _zap_pipe->set_event_sink (this);
386
387 send_bind (peer.socket, new_pipes[1], false);
388
389 // Send empty routing id if required by the peer.
390 if (peer.options.recv_routing_id) {
391 msg_t id;
392 rc = id.init ();
393 errno_assert (rc == 0);
394 id.set_flags (msg_t::routing_id);
395 bool ok = _zap_pipe->write (&id);
396 zmq_assert (ok);
397 _zap_pipe->flush ();
398 }
399
400 return 0;
401 }
402
zap_enabled() const403 bool zmq::session_base_t::zap_enabled () const
404 {
405 return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
406 }
407
process_attach(i_engine * engine_)408 void zmq::session_base_t::process_attach (i_engine *engine_)
409 {
410 zmq_assert (engine_ != NULL);
411 zmq_assert (!_engine);
412 _engine = engine_;
413
414 if (!engine_->has_handshake_stage ())
415 engine_ready ();
416
417 // Plug in the engine.
418 _engine->plug (_io_thread, this);
419 }
420
engine_ready()421 void zmq::session_base_t::engine_ready ()
422 {
423 // Create the pipe if it does not exist yet.
424 if (!_pipe && !is_terminating ()) {
425 object_t *parents[2] = {this, _socket};
426 pipe_t *pipes[2] = {NULL, NULL};
427
428 const bool conflate = get_effective_conflate_option (options);
429
430 int hwms[2] = {conflate ? -1 : options.rcvhwm,
431 conflate ? -1 : options.sndhwm};
432 bool conflates[2] = {conflate, conflate};
433 const int rc = pipepair (parents, pipes, hwms, conflates);
434 errno_assert (rc == 0);
435
436 // Plug the local end of the pipe.
437 pipes[0]->set_event_sink (this);
438
439 // Remember the local end of the pipe.
440 zmq_assert (!_pipe);
441 _pipe = pipes[0];
442
443 // The endpoints strings are not set on bind, set them here so that
444 // events can use them.
445 pipes[0]->set_endpoint_pair (_engine->get_endpoint ());
446 pipes[1]->set_endpoint_pair (_engine->get_endpoint ());
447
448 // Ask socket to plug into the remote end of the pipe.
449 send_bind (_socket, pipes[1]);
450 }
451 }
452
engine_error(bool handshaked_,zmq::i_engine::error_reason_t reason_)453 void zmq::session_base_t::engine_error (bool handshaked_,
454 zmq::i_engine::error_reason_t reason_)
455 {
456 // Engine is dead. Let's forget about it.
457 _engine = NULL;
458
459 // Remove any half-done messages from the pipes.
460 if (_pipe) {
461 clean_pipes ();
462
463 #ifdef ZMQ_BUILD_DRAFT_API
464 // Only send disconnect message if socket was accepted and handshake was completed
465 if (!_active && handshaked_ && options.can_recv_disconnect_msg
466 && !options.disconnect_msg.empty ()) {
467 _pipe->set_disconnect_msg (options.disconnect_msg);
468 _pipe->send_disconnect_msg ();
469 }
470 #endif
471 }
472
473 zmq_assert (reason_ == i_engine::connection_error
474 || reason_ == i_engine::timeout_error
475 || reason_ == i_engine::protocol_error);
476
477 switch (reason_) {
478 case i_engine::timeout_error:
479 /* FALLTHROUGH */
480 case i_engine::connection_error:
481 if (_active) {
482 reconnect ();
483 break;
484 }
485
486 case i_engine::protocol_error:
487 if (_pending) {
488 if (_pipe)
489 _pipe->terminate (false);
490 if (_zap_pipe)
491 _zap_pipe->terminate (false);
492 } else {
493 terminate ();
494 }
495 break;
496 }
497
498 // Just in case there's only a delimiter in the pipe.
499 if (_pipe)
500 _pipe->check_read ();
501
502 if (_zap_pipe)
503 _zap_pipe->check_read ();
504 }
505
process_term(int linger_)506 void zmq::session_base_t::process_term (int linger_)
507 {
508 zmq_assert (!_pending);
509
510 // If the termination of the pipe happens before the term command is
511 // delivered there's nothing much to do. We can proceed with the
512 // standard termination immediately.
513 if (!_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
514 own_t::process_term (0);
515 return;
516 }
517
518 _pending = true;
519
520 if (_pipe != NULL) {
521 // If there's finite linger value, delay the termination.
522 // If linger is infinite (negative) we don't even have to set
523 // the timer.
524 if (linger_ > 0) {
525 zmq_assert (!_has_linger_timer);
526 add_timer (linger_, linger_timer_id);
527 _has_linger_timer = true;
528 }
529
530 // Start pipe termination process. Delay the termination till all messages
531 // are processed in case the linger time is non-zero.
532 _pipe->terminate (linger_ != 0);
533
534 // TODO: Should this go into pipe_t::terminate ?
535 // In case there's no engine and there's only delimiter in the
536 // pipe it wouldn't be ever read. Thus we check for it explicitly.
537 if (!_engine)
538 _pipe->check_read ();
539 }
540
541 if (_zap_pipe != NULL)
542 _zap_pipe->terminate (false);
543 }
544
timer_event(int id_)545 void zmq::session_base_t::timer_event (int id_)
546 {
547 // Linger period expired. We can proceed with termination even though
548 // there are still pending messages to be sent.
549 zmq_assert (id_ == linger_timer_id);
550 _has_linger_timer = false;
551
552 // Ask pipe to terminate even though there may be pending messages in it.
553 zmq_assert (_pipe);
554 _pipe->terminate (false);
555 }
556
process_conn_failed()557 void zmq::session_base_t::process_conn_failed ()
558 {
559 std::string *ep = new (std::string);
560 _addr->to_string (*ep);
561 send_term_endpoint (_socket, ep);
562 }
563
reconnect()564 void zmq::session_base_t::reconnect ()
565 {
566 // For delayed connect situations, terminate the pipe
567 // and reestablish later on
568 if (_pipe && options.immediate == 1
569 #ifdef ZMQ_HAVE_OPENPGM
570 && _addr->protocol != protocol_name::pgm
571 && _addr->protocol != protocol_name::epgm
572 #endif
573 #ifdef ZMQ_HAVE_NORM
574 && _addr->protocol != protocol_name::norm
575 #endif
576 && _addr->protocol != protocol_name::udp) {
577 _pipe->hiccup ();
578 _pipe->terminate (false);
579 _terminating_pipes.insert (_pipe);
580 _pipe = NULL;
581
582 if (_has_linger_timer) {
583 cancel_timer (linger_timer_id);
584 _has_linger_timer = false;
585 }
586 }
587
588 reset ();
589
590 // Reconnect.
591 if (options.reconnect_ivl > 0)
592 start_connecting (true);
593 else {
594 std::string *ep = new (std::string);
595 _addr->to_string (*ep);
596 send_term_endpoint (_socket, ep);
597 }
598
599 // For subscriber sockets we hiccup the inbound pipe, which will cause
600 // the socket object to resend all the subscriptions.
601 if (_pipe
602 && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB
603 || options.type == ZMQ_DISH))
604 _pipe->hiccup ();
605 }
606
start_connecting(bool wait_)607 void zmq::session_base_t::start_connecting (bool wait_)
608 {
609 zmq_assert (_active);
610
611 // Choose I/O thread to run connecter in. Given that we are already
612 // running in an I/O thread, there must be at least one available.
613 io_thread_t *io_thread = choose_io_thread (options.affinity);
614 zmq_assert (io_thread);
615
616 // Create the connecter object.
617 own_t *connecter = NULL;
618 if (_addr->protocol == protocol_name::tcp) {
619 if (!options.socks_proxy_address.empty ()) {
620 address_t *proxy_address = new (std::nothrow)
621 address_t (protocol_name::tcp, options.socks_proxy_address,
622 this->get_ctx ());
623 alloc_assert (proxy_address);
624 connecter = new (std::nothrow) socks_connecter_t (
625 io_thread, this, options, _addr, proxy_address, wait_);
626 alloc_assert (connecter);
627 if (!options.socks_proxy_username.empty ()) {
628 reinterpret_cast<socks_connecter_t *> (connecter)
629 ->set_auth_method_basic (options.socks_proxy_username,
630 options.socks_proxy_password);
631 }
632 } else {
633 connecter = new (std::nothrow)
634 tcp_connecter_t (io_thread, this, options, _addr, wait_);
635 }
636 }
637 #if defined ZMQ_HAVE_IPC
638 else if (_addr->protocol == protocol_name::ipc) {
639 connecter = new (std::nothrow)
640 ipc_connecter_t (io_thread, this, options, _addr, wait_);
641 }
642 #endif
643 #if defined ZMQ_HAVE_TIPC
644 else if (_addr->protocol == protocol_name::tipc) {
645 connecter = new (std::nothrow)
646 tipc_connecter_t (io_thread, this, options, _addr, wait_);
647 }
648 #endif
649 #if defined ZMQ_HAVE_VMCI
650 else if (_addr->protocol == protocol_name::vmci) {
651 connecter = new (std::nothrow)
652 vmci_connecter_t (io_thread, this, options, _addr, wait_);
653 }
654 #endif
655 #if defined ZMQ_HAVE_WS
656 else if (_addr->protocol == protocol_name::ws) {
657 connecter = new (std::nothrow) ws_connecter_t (
658 io_thread, this, options, _addr, wait_, false, std::string ());
659 }
660 #endif
661 #if defined ZMQ_HAVE_WSS
662 else if (_addr->protocol == protocol_name::wss) {
663 connecter = new (std::nothrow) ws_connecter_t (
664 io_thread, this, options, _addr, wait_, true, _wss_hostname);
665 }
666 #endif
667 if (connecter != NULL) {
668 alloc_assert (connecter);
669 launch_child (connecter);
670 return;
671 }
672
673 if (_addr->protocol == protocol_name::udp) {
674 zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
675 || options.type == ZMQ_DGRAM);
676
677 udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
678 alloc_assert (engine);
679
680 bool recv = false;
681 bool send = false;
682
683 if (options.type == ZMQ_RADIO) {
684 send = true;
685 recv = false;
686 } else if (options.type == ZMQ_DISH) {
687 send = false;
688 recv = true;
689 } else if (options.type == ZMQ_DGRAM) {
690 send = true;
691 recv = true;
692 }
693
694 int rc = engine->init (_addr, send, recv);
695 errno_assert (rc == 0);
696
697 send_attach (this, engine);
698
699 return;
700 }
701
702 #ifdef ZMQ_HAVE_OPENPGM
703
704 // Both PGM and EPGM transports are using the same infrastructure.
705 if (_addr->protocol == "pgm" || _addr->protocol == "epgm") {
706 zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
707 || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
708
709 // For EPGM transport with UDP encapsulation of PGM is used.
710 bool const udp_encapsulation = _addr->protocol == "epgm";
711
712 // At this point we'll create message pipes to the session straight
713 // away. There's no point in delaying it as no concept of 'connect'
714 // exists with PGM anyway.
715 if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
716 // PGM sender.
717 pgm_sender_t *pgm_sender =
718 new (std::nothrow) pgm_sender_t (io_thread, options);
719 alloc_assert (pgm_sender);
720
721 int rc =
722 pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
723 errno_assert (rc == 0);
724
725 send_attach (this, pgm_sender);
726 } else {
727 // PGM receiver.
728 pgm_receiver_t *pgm_receiver =
729 new (std::nothrow) pgm_receiver_t (io_thread, options);
730 alloc_assert (pgm_receiver);
731
732 int rc =
733 pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
734 errno_assert (rc == 0);
735
736 send_attach (this, pgm_receiver);
737 }
738
739 return;
740 }
741 #endif
742
743 #ifdef ZMQ_HAVE_NORM
744 if (_addr->protocol == "norm") {
745 // At this point we'll create message pipes to the session straight
746 // away. There's no point in delaying it as no concept of 'connect'
747 // exists with NORM anyway.
748 if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
749 // NORM sender.
750 norm_engine_t *norm_sender =
751 new (std::nothrow) norm_engine_t (io_thread, options);
752 alloc_assert (norm_sender);
753
754 int rc = norm_sender->init (_addr->address.c_str (), true, false);
755 errno_assert (rc == 0);
756
757 send_attach (this, norm_sender);
758 } else { // ZMQ_SUB or ZMQ_XSUB
759
760 // NORM receiver.
761 norm_engine_t *norm_receiver =
762 new (std::nothrow) norm_engine_t (io_thread, options);
763 alloc_assert (norm_receiver);
764
765 int rc = norm_receiver->init (_addr->address.c_str (), false, true);
766 errno_assert (rc == 0);
767
768 send_attach (this, norm_receiver);
769 }
770 return;
771 }
772 #endif // ZMQ_HAVE_NORM
773
774 zmq_assert (false);
775 }
776
hello_msg_session_t(io_thread_t * io_thread_,bool connect_,socket_base_t * socket_,const options_t & options_,address_t * addr_)777 zmq::hello_msg_session_t::hello_msg_session_t (io_thread_t *io_thread_,
778 bool connect_,
779 socket_base_t *socket_,
780 const options_t &options_,
781 address_t *addr_) :
782 session_base_t (io_thread_, connect_, socket_, options_, addr_),
783 _new_pipe (true)
784 {
785 }
786
~hello_msg_session_t()787 zmq::hello_msg_session_t::~hello_msg_session_t ()
788 {
789 }
790
791
pull_msg(msg_t * msg_)792 int zmq::hello_msg_session_t::pull_msg (msg_t *msg_)
793 {
794 if (_new_pipe) {
795 _new_pipe = false;
796
797 const int rc =
798 msg_->init_buffer (&options.hello_msg[0], options.hello_msg.size ());
799 errno_assert (rc == 0);
800
801 return 0;
802 }
803
804 return session_base_t::pull_msg (msg_);
805 }
806
reset()807 void zmq::hello_msg_session_t::reset ()
808 {
809 session_base_t::reset ();
810 _new_pipe = true;
811 }
812