1 /* -*- c++ -*- */
2 /*
3 * Copyright 2013 Free Software Foundation, Inc.
4 *
5 * This file is part of GNU Radio
6 *
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3, or (at your option)
10 * any later version.
11 *
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Radio; see the file COPYING. If not, write to
19 * the Free Software Foundation, Inc., 51 Franklin Street,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include "tcp_connection.h"
28 #include <gnuradio/basic_block.h>
29 #include <gnuradio/blocks/pdu.h>
30
31 namespace gr {
32 namespace blocks {
33
make(boost::asio::io_service & io_service,int MTU,bool no_delay)34 tcp_connection::sptr tcp_connection::make(boost::asio::io_service& io_service,
35 int MTU /*= 10000*/,
36 bool no_delay /*=false*/)
37 {
38 return sptr(new tcp_connection(io_service, MTU, no_delay));
39 }
40
tcp_connection(boost::asio::io_service & io_service,int MTU,bool no_delay)41 tcp_connection::tcp_connection(boost::asio::io_service& io_service,
42 int MTU /*= 10000*/,
43 bool no_delay /*=false*/)
44 : d_socket(io_service), d_block(NULL), d_no_delay(no_delay)
45 {
46 d_buf.resize(MTU);
47 try {
48 d_socket.set_option(boost::asio::ip::tcp::no_delay(no_delay));
49 } catch (...) {
50 // Silently ignore failure (socket might be current in accept stage) and try again
51 // in 'start'
52 }
53 }
54
send(pmt::pmt_t vector)55 void tcp_connection::send(pmt::pmt_t vector)
56 {
57 size_t len = pmt::blob_length(vector);
58
59 // Asio async_write() requires the buffer to remain valid until the handler is called.
60 boost::shared_ptr<char[]> txbuf(new char[len]);
61
62 size_t temp = 0;
63 memcpy(txbuf.get(), pmt::uniform_vector_elements(vector, temp), len);
64
65 size_t offset = 0;
66 while (offset < len) {
67 // Limit the size of each write() to the MTU.
68 // FIXME: Note that this has the effect of breaking a large PDU into several
69 // smaller PDUs, each containing <= MTU bytes. Is this the desired behavior?
70 size_t send_len = std::min((len - offset), d_buf.size());
71 boost::asio::async_write(
72 d_socket,
73 boost::asio::buffer(txbuf.get() + offset, send_len),
74 boost::bind(&tcp_connection::handle_write,
75 this,
76 txbuf,
77 boost::asio::placeholders::error,
78 boost::asio::placeholders::bytes_transferred));
79 offset += send_len;
80 }
81 }
82
start(gr::basic_block * block)83 void tcp_connection::start(gr::basic_block* block)
84 {
85 d_block = block;
86 d_socket.set_option(boost::asio::ip::tcp::no_delay(d_no_delay));
87 d_socket.async_read_some(boost::asio::buffer(d_buf),
88 boost::bind(&tcp_connection::handle_read,
89 this,
90 boost::asio::placeholders::error,
91 boost::asio::placeholders::bytes_transferred));
92 }
93
handle_read(const boost::system::error_code & error,size_t bytes_transferred)94 void tcp_connection::handle_read(const boost::system::error_code& error,
95 size_t bytes_transferred)
96 {
97 if (!error) {
98 if (d_block) {
99 pmt::pmt_t vector =
100 pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_buf[0]);
101 pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
102
103 d_block->message_port_pub(pdu::pdu_port_id(), pdu);
104 }
105
106 d_socket.async_read_some(
107 boost::asio::buffer(d_buf),
108 boost::bind(&tcp_connection::handle_read,
109 this,
110 boost::asio::placeholders::error,
111 boost::asio::placeholders::bytes_transferred));
112 } else {
113 d_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
114 d_socket.close();
115 }
116 }
117 } /* namespace blocks */
118 } /* namespace gr */
119