1 /*
2 Copyright (c) 2007-2011 iMatix Corporation
3 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
4
5 This file is part of 0MQ.
6
7 0MQ is free software; you can redistribute it and/or modify it under
8 the terms of the GNU Lesser General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
11
12 0MQ is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "platform.hpp"
22
23 #if defined ZMQ_HAVE_OPENPGM
24
25 #include <new>
26
27 #ifdef ZMQ_HAVE_WINDOWS
28 #include "windows.hpp"
29 #endif
30
31 #include "pgm_receiver.hpp"
32 #include "err.hpp"
33 #include "stdint.hpp"
34 #include "wire.hpp"
35 #include "i_inout.hpp"
36
pgm_receiver_t(class io_thread_t * parent_,const options_t & options_)37 zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
38 const options_t &options_) :
39 io_object_t (parent_),
40 has_rx_timer (false),
41 pgm_socket (true, options_),
42 options (options_),
43 inout (NULL),
44 mru_decoder (NULL),
45 pending_bytes (0)
46 {
47 }
48
~pgm_receiver_t()49 zmq::pgm_receiver_t::~pgm_receiver_t ()
50 {
51 // Destructor should not be called before unplug.
52 zmq_assert (peers.empty ());
53 }
54
init(bool udp_encapsulation_,const char * network_)55 int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
56 {
57 return pgm_socket.init (udp_encapsulation_, network_);
58 }
59
plug(io_thread_t * io_thread_,i_inout * inout_)60 void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_)
61 {
62 // Retrieve PGM fds and start polling.
63 fd_t socket_fd = retired_fd;
64 fd_t waiting_pipe_fd = retired_fd;
65 pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
66 socket_handle = add_fd (socket_fd);
67 pipe_handle = add_fd (waiting_pipe_fd);
68 set_pollin (pipe_handle);
69 set_pollin (socket_handle);
70
71 inout = inout_;
72 }
73
unplug()74 void zmq::pgm_receiver_t::unplug ()
75 {
76 // Delete decoders.
77 for (peers_t::iterator it = peers.begin (); it != peers.end (); ++it) {
78 if (it->second.decoder != NULL)
79 delete it->second.decoder;
80 }
81 peers.clear ();
82
83 mru_decoder = NULL;
84 pending_bytes = 0;
85
86 if (has_rx_timer) {
87 cancel_timer (rx_timer_id);
88 has_rx_timer = false;
89 }
90
91 rm_fd (socket_handle);
92 rm_fd (pipe_handle);
93
94 inout = NULL;
95 }
96
terminate()97 void zmq::pgm_receiver_t::terminate ()
98 {
99 unplug ();
100 delete this;
101 }
102
activate_out()103 void zmq::pgm_receiver_t::activate_out ()
104 {
105 zmq_assert (false);
106 }
107
activate_in()108 void zmq::pgm_receiver_t::activate_in ()
109 {
110 // It is possible that the most recently used decoder
111 // processed the whole buffer but failed to write
112 // the last message into the pipe.
113 if (pending_bytes == 0) {
114 if (mru_decoder != NULL)
115 mru_decoder->process_buffer (NULL, 0);
116 return;
117 }
118
119 zmq_assert (mru_decoder != NULL);
120 zmq_assert (pending_ptr != NULL);
121
122 // Ask the decoder to process remaining data.
123 size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes);
124 pending_bytes -= n;
125
126 if (pending_bytes > 0)
127 return;
128
129 // Resume polling.
130 set_pollin (pipe_handle);
131 set_pollin (socket_handle);
132
133 in_event ();
134 }
135
in_event()136 void zmq::pgm_receiver_t::in_event ()
137 {
138 // Read data from the underlying pgm_socket.
139 unsigned char *data = NULL;
140 const pgm_tsi_t *tsi = NULL;
141
142 zmq_assert (pending_bytes == 0);
143
144 if (has_rx_timer) {
145 cancel_timer (rx_timer_id);
146 has_rx_timer = false;
147 }
148
149 // TODO: This loop can effectively block other engines in the same I/O
150 // thread in the case of high load.
151 while (true) {
152
153 // Get new batch of data.
154 // Note the workaround made not to break strict-aliasing rules.
155 void *tmp = NULL;
156 ssize_t received = pgm_socket.receive (&tmp, &tsi);
157 data = (unsigned char*) tmp;
158
159 // No data to process. This may happen if the packet received is
160 // neither ODATA nor ODATA.
161 if (received == 0) {
162 if (errno == ENOMEM || errno == EBUSY) {
163 const long timeout = pgm_socket.get_rx_timeout ();
164 add_timer (timeout, rx_timer_id);
165 has_rx_timer = true;
166 }
167 break;
168 }
169
170 // Find the peer based on its TSI.
171 peers_t::iterator it = peers.find (*tsi);
172
173 // Data loss. Delete decoder and mark the peer as disjoint.
174 if (received == -1) {
175 if (it != peers.end ()) {
176 it->second.joined = false;
177 if (it->second.decoder == mru_decoder)
178 mru_decoder = NULL;
179 if (it->second.decoder != NULL) {
180 delete it->second.decoder;
181 it->second.decoder = NULL;
182 }
183 }
184 break;
185 }
186
187 // New peer. Add it to the list of know but unjoint peers.
188 if (it == peers.end ()) {
189 peer_info_t peer_info = {false, NULL};
190 it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
191 }
192
193 // Read the offset of the fist message in the current packet.
194 zmq_assert ((size_t) received >= sizeof (uint16_t));
195 uint16_t offset = get_uint16 (data);
196 data += sizeof (uint16_t);
197 received -= sizeof (uint16_t);
198
199 // Join the stream if needed.
200 if (!it->second.joined) {
201
202 // There is no beginning of the message in current packet.
203 // Ignore the data.
204 if (offset == 0xffff)
205 continue;
206
207 zmq_assert (offset <= received);
208 zmq_assert (it->second.decoder == NULL);
209
210 // We have to move data to the begining of the first message.
211 data += offset;
212 received -= offset;
213
214 // Mark the stream as joined.
215 it->second.joined = true;
216
217 // Create and connect decoder for the peer.
218 it->second.decoder = new (std::nothrow) decoder_t (0);
219 alloc_assert (it->second.decoder);
220 it->second.decoder->set_inout (inout);
221 }
222
223 mru_decoder = it->second.decoder;
224
225 // Push all the data to the decoder.
226 ssize_t processed = it->second.decoder->process_buffer (data, received);
227 if (processed < received) {
228 // Save some state so we can resume the decoding process later.
229 pending_bytes = received - processed;
230 pending_ptr = data + processed;
231 // Stop polling.
232 reset_pollin (pipe_handle);
233 reset_pollin (socket_handle);
234
235 // Reset outstanding timer.
236 if (has_rx_timer) {
237 cancel_timer (rx_timer_id);
238 has_rx_timer = false;
239 }
240
241 break;
242 }
243 }
244
245 // Flush any messages decoder may have produced.
246 inout->flush ();
247 }
248
timer_event(int token)249 void zmq::pgm_receiver_t::timer_event (int token)
250 {
251 zmq_assert (token == rx_timer_id);
252
253 // Timer cancels on return by poller_base.
254 has_rx_timer = false;
255 in_event ();
256 }
257
258 #endif
259
260