1 /* 2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file 3 4 This file is part of libzmq, the ZeroMQ core engine in C++. 5 6 libzmq is free software; you can redistribute it and/or modify it under 7 the terms of the GNU Lesser General Public License (LGPL) as published 8 by the Free Software Foundation; either version 3 of the License, or 9 (at your option) any later version. 10 11 As a special exception, the Contributors give you permission to link 12 this library with independent modules to produce an executable, 13 regardless of the license terms of these independent modules, and to 14 copy and distribute the resulting executable under terms of your choice, 15 provided that you also meet, for each linked independent module, the 16 terms and conditions of the license of that module. An independent 17 module is a module which is not derived from or based on this library. 18 If you modify this library, you must extend this exception to your 19 version of the library. 20 21 libzmq is distributed in the hope that it will be useful, but WITHOUT 22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public 24 License for more details. 25 26 You should have received a copy of the GNU Lesser General Public License 27 along with this program. If not, see <http://www.gnu.org/licenses/>. 28 */ 29 30 #ifndef __ZMQ_PGM_RECEIVER_HPP_INCLUDED__ 31 #define __ZMQ_PGM_RECEIVER_HPP_INCLUDED__ 32 33 #if defined ZMQ_HAVE_OPENPGM 34 35 #include <map> 36 #include <algorithm> 37 38 #include "io_object.hpp" 39 #include "i_engine.hpp" 40 #include "options.hpp" 41 #include "v1_decoder.hpp" 42 #include "pgm_socket.hpp" 43 44 namespace zmq 45 { 46 class io_thread_t; 47 class session_base_t; 48 49 class pgm_receiver_t ZMQ_FINAL : public io_object_t, public i_engine 50 { 51 public: 52 pgm_receiver_t (zmq::io_thread_t *parent_, const options_t &options_); 53 ~pgm_receiver_t (); 54 55 int init (bool udp_encapsulation_, const char *network_); 56 57 // i_engine interface implementation. has_handshake_stage()58 bool has_handshake_stage () { return false; }; 59 void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); 60 void terminate (); 61 bool restart_input (); 62 void restart_output (); zap_msg_available()63 void zap_msg_available () {} 64 const endpoint_uri_pair_t &get_endpoint () const; 65 66 // i_poll_events interface implementation. 67 void in_event (); 68 void timer_event (int token); 69 70 private: 71 // Unplug the engine from the session. 72 void unplug (); 73 74 // Decode received data (inpos, insize) and forward decoded 75 // messages to the session. 76 int process_input (v1_decoder_t *decoder); 77 78 // PGM is not able to move subscriptions upstream. Thus, drop all 79 // the pending subscriptions. 80 void drop_subscriptions (); 81 82 // RX timeout timer ID. 83 enum 84 { 85 rx_timer_id = 0xa1 86 }; 87 88 const endpoint_uri_pair_t _empty_endpoint; 89 90 // RX timer is running. 91 bool has_rx_timer; 92 93 // If joined is true we are already getting messages from the peer. 94 // It it's false, we are getting data but still we haven't seen 95 // beginning of a message. 96 struct peer_info_t 97 { 98 bool joined; 99 v1_decoder_t *decoder; 100 }; 101 102 struct tsi_comp 103 { operator ()zmq::ZMQ_FINAL::tsi_comp104 bool operator() (const pgm_tsi_t <si, const pgm_tsi_t &rtsi) const 105 { 106 uint32_t ll[2], rl[2]; 107 memcpy (ll, <si, sizeof (ll)); 108 memcpy (rl, &rtsi, sizeof (rl)); 109 return (ll[0] < rl[0]) || (ll[0] == rl[0] && ll[1] < rl[1]); 110 } 111 }; 112 113 typedef std::map<pgm_tsi_t, peer_info_t, tsi_comp> peers_t; 114 peers_t peers; 115 116 // PGM socket. 117 pgm_socket_t pgm_socket; 118 119 // Socket options. 120 options_t options; 121 122 // Associated session. 123 zmq::session_base_t *session; 124 125 const pgm_tsi_t *active_tsi; 126 127 // Number of bytes not consumed by the decoder due to pipe overflow. 128 size_t insize; 129 130 // Pointer to data still waiting to be processed by the decoder. 131 const unsigned char *inpos; 132 133 // Poll handle associated with PGM socket. 134 handle_t socket_handle; 135 136 // Poll handle associated with engine PGM waiting pipe. 137 handle_t pipe_handle; 138 139 ZMQ_NON_COPYABLE_NOR_MOVABLE (pgm_receiver_t) 140 }; 141 } 142 143 #endif 144 145 #endif 146