1 /* -*- c++ -*- */
2 /*
3  * Copyright 2013 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 
27 #include "tcp_connection.h"
28 #include <gnuradio/basic_block.h>
29 #include <gnuradio/blocks/pdu.h>
30 
31 namespace gr {
32 namespace blocks {
33 
make(boost::asio::io_service & io_service,int MTU,bool no_delay)34 tcp_connection::sptr tcp_connection::make(boost::asio::io_service& io_service,
35                                           int MTU /*= 10000*/,
36                                           bool no_delay /*=false*/)
37 {
38     return sptr(new tcp_connection(io_service, MTU, no_delay));
39 }
40 
tcp_connection(boost::asio::io_service & io_service,int MTU,bool no_delay)41 tcp_connection::tcp_connection(boost::asio::io_service& io_service,
42                                int MTU /*= 10000*/,
43                                bool no_delay /*=false*/)
44     : d_socket(io_service), d_block(NULL), d_no_delay(no_delay)
45 {
46     d_buf.resize(MTU);
47     try {
48         d_socket.set_option(boost::asio::ip::tcp::no_delay(no_delay));
49     } catch (...) {
50         // Silently ignore failure (socket might be current in accept stage) and try again
51         // in 'start'
52     }
53 }
54 
send(pmt::pmt_t vector)55 void tcp_connection::send(pmt::pmt_t vector)
56 {
57     size_t len = pmt::blob_length(vector);
58 
59     // Asio async_write() requires the buffer to remain valid until the handler is called.
60     boost::shared_ptr<char[]> txbuf(new char[len]);
61 
62     size_t temp = 0;
63     memcpy(txbuf.get(), pmt::uniform_vector_elements(vector, temp), len);
64 
65     size_t offset = 0;
66     while (offset < len) {
67         // Limit the size of each write() to the MTU.
68         // FIXME: Note that this has the effect of breaking a large PDU into several
69         // smaller PDUs, each containing <= MTU bytes. Is this the desired behavior?
70         size_t send_len = std::min((len - offset), d_buf.size());
71         boost::asio::async_write(
72             d_socket,
73             boost::asio::buffer(txbuf.get() + offset, send_len),
74             boost::bind(&tcp_connection::handle_write,
75                         this,
76                         txbuf,
77                         boost::asio::placeholders::error,
78                         boost::asio::placeholders::bytes_transferred));
79         offset += send_len;
80     }
81 }
82 
start(gr::basic_block * block)83 void tcp_connection::start(gr::basic_block* block)
84 {
85     d_block = block;
86     d_socket.set_option(boost::asio::ip::tcp::no_delay(d_no_delay));
87     d_socket.async_read_some(boost::asio::buffer(d_buf),
88                              boost::bind(&tcp_connection::handle_read,
89                                          this,
90                                          boost::asio::placeholders::error,
91                                          boost::asio::placeholders::bytes_transferred));
92 }
93 
handle_read(const boost::system::error_code & error,size_t bytes_transferred)94 void tcp_connection::handle_read(const boost::system::error_code& error,
95                                  size_t bytes_transferred)
96 {
97     if (!error) {
98         if (d_block) {
99             pmt::pmt_t vector =
100                 pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_buf[0]);
101             pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
102 
103             d_block->message_port_pub(pdu::pdu_port_id(), pdu);
104         }
105 
106         d_socket.async_read_some(
107             boost::asio::buffer(d_buf),
108             boost::bind(&tcp_connection::handle_read,
109                         this,
110                         boost::asio::placeholders::error,
111                         boost::asio::placeholders::bytes_transferred));
112     } else {
113         d_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
114         d_socket.close();
115     }
116 }
117 } /* namespace blocks */
118 } /* namespace gr */
119