1 /*
2     Copyright (c) 2007-2011 iMatix Corporation
3     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
4 
5     This file is part of 0MQ.
6 
7     0MQ is free software; you can redistribute it and/or modify it under
8     the terms of the GNU Lesser General Public License as published by
9     the Free Software Foundation; either version 3 of the License, or
10     (at your option) any later version.
11 
12     0MQ is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU Lesser General Public License for more details.
16 
17     You should have received a copy of the GNU Lesser General Public License
18     along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20 
21 #include "platform.hpp"
22 
23 #if defined ZMQ_HAVE_OPENPGM
24 
25 #include <new>
26 
27 #ifdef ZMQ_HAVE_WINDOWS
28 #include "windows.hpp"
29 #endif
30 
31 #include "pgm_receiver.hpp"
32 #include "err.hpp"
33 #include "stdint.hpp"
34 #include "wire.hpp"
35 #include "i_inout.hpp"
36 
pgm_receiver_t(class io_thread_t * parent_,const options_t & options_)37 zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
38       const options_t &options_) :
39     io_object_t (parent_),
40     has_rx_timer (false),
41     pgm_socket (true, options_),
42     options (options_),
43     inout (NULL),
44     mru_decoder (NULL),
45     pending_bytes (0)
46 {
47 }
48 
~pgm_receiver_t()49 zmq::pgm_receiver_t::~pgm_receiver_t ()
50 {
51     //  Destructor should not be called before unplug.
52     zmq_assert (peers.empty ());
53 }
54 
init(bool udp_encapsulation_,const char * network_)55 int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
56 {
57     return pgm_socket.init (udp_encapsulation_, network_);
58 }
59 
plug(io_thread_t * io_thread_,i_inout * inout_)60 void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_)
61 {
62     //  Retrieve PGM fds and start polling.
63     fd_t socket_fd = retired_fd;
64     fd_t waiting_pipe_fd = retired_fd;
65     pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
66     socket_handle = add_fd (socket_fd);
67     pipe_handle = add_fd (waiting_pipe_fd);
68     set_pollin (pipe_handle);
69     set_pollin (socket_handle);
70 
71     inout = inout_;
72 }
73 
unplug()74 void zmq::pgm_receiver_t::unplug ()
75 {
76     //  Delete decoders.
77     for (peers_t::iterator it = peers.begin (); it != peers.end (); ++it) {
78         if (it->second.decoder != NULL)
79             delete it->second.decoder;
80     }
81     peers.clear ();
82 
83     mru_decoder = NULL;
84     pending_bytes = 0;
85 
86     if (has_rx_timer) {
87         cancel_timer (rx_timer_id);
88         has_rx_timer = false;
89     }
90 
91     rm_fd (socket_handle);
92     rm_fd (pipe_handle);
93 
94     inout = NULL;
95 }
96 
terminate()97 void zmq::pgm_receiver_t::terminate ()
98 {
99     unplug ();
100     delete this;
101 }
102 
activate_out()103 void zmq::pgm_receiver_t::activate_out ()
104 {
105     zmq_assert (false);
106 }
107 
activate_in()108 void zmq::pgm_receiver_t::activate_in ()
109 {
110     //  It is possible that the most recently used decoder
111     //  processed the whole buffer but failed to write
112     //  the last message into the pipe.
113     if (pending_bytes == 0) {
114         if (mru_decoder != NULL)
115             mru_decoder->process_buffer (NULL, 0);
116         return;
117     }
118 
119     zmq_assert (mru_decoder != NULL);
120     zmq_assert (pending_ptr != NULL);
121 
122     //  Ask the decoder to process remaining data.
123     size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes);
124     pending_bytes -= n;
125 
126     if (pending_bytes > 0)
127         return;
128 
129     //  Resume polling.
130     set_pollin (pipe_handle);
131     set_pollin (socket_handle);
132 
133     in_event ();
134 }
135 
in_event()136 void zmq::pgm_receiver_t::in_event ()
137 {
138     // Read data from the underlying pgm_socket.
139     unsigned char *data = NULL;
140     const pgm_tsi_t *tsi = NULL;
141 
142     zmq_assert (pending_bytes == 0);
143 
144     if (has_rx_timer) {
145         cancel_timer (rx_timer_id);
146         has_rx_timer = false;
147     }
148 
149     //  TODO: This loop can effectively block other engines in the same I/O
150     //  thread in the case of high load.
151     while (true) {
152 
153         //  Get new batch of data.
154         //  Note the workaround made not to break strict-aliasing rules.
155         void *tmp = NULL;
156         ssize_t received = pgm_socket.receive (&tmp, &tsi);
157         data = (unsigned char*) tmp;
158 
159         //  No data to process. This may happen if the packet received is
160         //  neither ODATA nor ODATA.
161         if (received == 0) {
162             if (errno == ENOMEM || errno == EBUSY) {
163                 const long timeout = pgm_socket.get_rx_timeout ();
164                 add_timer (timeout, rx_timer_id);
165                 has_rx_timer = true;
166             }
167             break;
168         }
169 
170         //  Find the peer based on its TSI.
171         peers_t::iterator it = peers.find (*tsi);
172 
173         //  Data loss. Delete decoder and mark the peer as disjoint.
174         if (received == -1) {
175             if (it != peers.end ()) {
176                 it->second.joined = false;
177                 if (it->second.decoder == mru_decoder)
178                     mru_decoder = NULL;
179                 if (it->second.decoder != NULL) {
180                     delete it->second.decoder;
181                     it->second.decoder = NULL;
182                 }
183             }
184             break;
185         }
186 
187         //  New peer. Add it to the list of know but unjoint peers.
188         if (it == peers.end ()) {
189             peer_info_t peer_info = {false, NULL};
190             it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
191         }
192 
193         //  Read the offset of the fist message in the current packet.
194         zmq_assert ((size_t) received >= sizeof (uint16_t));
195         uint16_t offset = get_uint16 (data);
196         data += sizeof (uint16_t);
197         received -= sizeof (uint16_t);
198 
199         //  Join the stream if needed.
200         if (!it->second.joined) {
201 
202             //  There is no beginning of the message in current packet.
203             //  Ignore the data.
204             if (offset == 0xffff)
205                 continue;
206 
207             zmq_assert (offset <= received);
208             zmq_assert (it->second.decoder == NULL);
209 
210             //  We have to move data to the begining of the first message.
211             data += offset;
212             received -= offset;
213 
214             //  Mark the stream as joined.
215             it->second.joined = true;
216 
217             //  Create and connect decoder for the peer.
218             it->second.decoder = new (std::nothrow) decoder_t (0);
219             alloc_assert (it->second.decoder);
220             it->second.decoder->set_inout (inout);
221         }
222 
223         mru_decoder = it->second.decoder;
224 
225         //  Push all the data to the decoder.
226         ssize_t processed = it->second.decoder->process_buffer (data, received);
227         if (processed < received) {
228             //  Save some state so we can resume the decoding process later.
229             pending_bytes = received - processed;
230             pending_ptr = data + processed;
231             //  Stop polling.
232             reset_pollin (pipe_handle);
233             reset_pollin (socket_handle);
234 
235             //  Reset outstanding timer.
236             if (has_rx_timer) {
237                 cancel_timer (rx_timer_id);
238                 has_rx_timer = false;
239             }
240 
241             break;
242         }
243     }
244 
245     //  Flush any messages decoder may have produced.
246     inout->flush ();
247 }
248 
timer_event(int token)249 void zmq::pgm_receiver_t::timer_event (int token)
250 {
251     zmq_assert (token == rx_timer_id);
252 
253     //  Timer cancels on return by poller_base.
254     has_rx_timer = false;
255     in_event ();
256 }
257 
258 #endif
259 
260