1 /*
2     Copyright (c) 2007-2010 iMatix Corporation
3 
4     This file is part of 0MQ.
5 
6     0MQ is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License as published by
8     the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     0MQ is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU Lesser General Public License for more details.
15 
16     You should have received a copy of the GNU Lesser General Public License
17     along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 #include "platform.hpp"
21 
22 #if defined ZMQ_HAVE_OPENPGM
23 
24 #ifdef ZMQ_HAVE_WINDOWS
25 #include "windows.hpp"
26 #endif
27 
28 #include <stdlib.h>
29 
30 #include "io_thread.hpp"
31 #include "pgm_sender.hpp"
32 #include "err.hpp"
33 #include "wire.hpp"
34 #include "stdint.hpp"
35 
pgm_sender_t(io_thread_t * parent_,const options_t & options_)36 zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
37       const options_t &options_) :
38     io_object_t (parent_),
39     has_tx_timer (false),
40     has_rx_timer (false),
41     encoder (0),
42     pgm_socket (false, options_),
43     options (options_),
44     out_buffer (NULL),
45     out_buffer_size (0),
46     write_size (0)
47 {
48 }
49 
init(bool udp_encapsulation_,const char * network_)50 int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
51 {
52     int rc = pgm_socket.init (udp_encapsulation_, network_);
53     if (rc != 0)
54         return rc;
55 
56     out_buffer_size = pgm_socket.get_max_tsdu_size ();
57     out_buffer = (unsigned char*) malloc (out_buffer_size);
58     alloc_assert (out_buffer);
59 
60     return rc;
61 }
62 
plug(io_thread_t * io_thread_,i_inout * inout_)63 void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_)
64 {
65     //  Alocate 2 fds for PGM socket.
66     int downlink_socket_fd = 0;
67     int uplink_socket_fd = 0;
68     int rdata_notify_fd = 0;
69     int pending_notify_fd = 0;
70 
71     encoder.set_inout (inout_);
72 
73     //  Fill fds from PGM transport and add them to the poller.
74     pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
75         &rdata_notify_fd, &pending_notify_fd);
76 
77     handle = add_fd (downlink_socket_fd);
78     uplink_handle = add_fd (uplink_socket_fd);
79     rdata_notify_handle = add_fd (rdata_notify_fd);
80     pending_notify_handle = add_fd (pending_notify_fd);
81 
82     //  Set POLLIN. We wont never want to stop polling for uplink = we never
83     //  want to stop porocess NAKs.
84     set_pollin (uplink_handle);
85     set_pollin (rdata_notify_handle);
86     set_pollin (pending_notify_handle);
87 
88     //  Set POLLOUT for downlink_socket_handle.
89     set_pollout (handle);
90 }
91 
unplug()92 void zmq::pgm_sender_t::unplug ()
93 {
94     if (has_rx_timer) {
95         cancel_timer (rx_timer_id);
96         has_rx_timer = false;
97     }
98 
99     if (has_tx_timer) {
100         cancel_timer (tx_timer_id);
101         has_tx_timer = false;
102     }
103 
104     rm_fd (handle);
105     rm_fd (uplink_handle);
106     rm_fd (rdata_notify_handle);
107     rm_fd (pending_notify_handle);
108     encoder.set_inout (NULL);
109 }
110 
terminate()111 void zmq::pgm_sender_t::terminate ()
112 {
113     unplug ();
114     delete this;
115 }
116 
activate_out()117 void zmq::pgm_sender_t::activate_out ()
118 {
119     set_pollout (handle);
120     out_event ();
121 }
122 
activate_in()123 void zmq::pgm_sender_t::activate_in ()
124 {
125     zmq_assert (false);
126 }
127 
~pgm_sender_t()128 zmq::pgm_sender_t::~pgm_sender_t ()
129 {
130     if (out_buffer) {
131         free (out_buffer);
132         out_buffer = NULL;
133     }
134 }
135 
in_event()136 void zmq::pgm_sender_t::in_event ()
137 {
138     if (has_rx_timer) {
139         cancel_timer (rx_timer_id);
140         has_rx_timer = false;
141     }
142 
143     //  In-event on sender side means NAK or SPMR receiving from some peer.
144     pgm_socket.process_upstream ();
145     if (errno == ENOMEM || errno == EBUSY) {
146         const long timeout = pgm_socket.get_rx_timeout ();
147         add_timer (timeout, rx_timer_id);
148         has_rx_timer = true;
149     }
150 }
151 
out_event()152 void zmq::pgm_sender_t::out_event ()
153 {
154     //  POLLOUT event from send socket. If write buffer is empty,
155     //  try to read new data from the encoder.
156     if (write_size == 0) {
157 
158         //  First two bytes (sizeof uint16_t) are used to store message
159         //  offset in following steps. Note that by passing our buffer to
160         //  the get data function we prevent it from returning its own buffer.
161         unsigned char *bf = out_buffer + sizeof (uint16_t);
162         size_t bfsz = out_buffer_size - sizeof (uint16_t);
163         int offset = -1;
164         encoder.get_data (&bf, &bfsz, &offset);
165 
166         //  If there are no data to write stop polling for output.
167         if (!bfsz) {
168             reset_pollout (handle);
169             return;
170         }
171 
172         //  Put offset information in the buffer.
173         write_size = bfsz + sizeof (uint16_t);
174         put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
175     }
176 
177     if (has_tx_timer) {
178         cancel_timer (tx_timer_id);
179         has_tx_timer = false;
180     }
181 
182     //  Send the data.
183     size_t nbytes = pgm_socket.send (out_buffer, write_size);
184 
185     //  We can write either all data or 0 which means rate limit reached.
186     if (nbytes == write_size) {
187         write_size = 0;
188     } else {
189         zmq_assert (nbytes == 0);
190 
191         if (errno == ENOMEM) {
192             const long timeout = pgm_socket.get_tx_timeout ();
193             add_timer (timeout, tx_timer_id);
194             has_tx_timer = true;
195         } else
196             zmq_assert (errno == EBUSY);
197     }
198 }
199 
timer_event(int token)200 void zmq::pgm_sender_t::timer_event (int token)
201 {
202     //  Timer cancels on return by poller_base.
203     if (token == rx_timer_id) {
204         has_rx_timer = false;
205         in_event ();
206     } else if (token == tx_timer_id) {
207         has_tx_timer = false;
208         out_event ();
209     } else
210         zmq_assert (false);
211 }
212 
213 #endif
214 
215