1 /* -*- c++ -*- */
2 /*
3  * Copyright 2013 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 
27 #ifdef HAVE_IO_H
28 #include <io.h>
29 #endif
30 
31 #ifdef HAVE_WINDOWS_H
32 #include <winsock2.h>
33 #endif
34 
35 #include "stream_pdu_base.h"
36 #include <gnuradio/basic_block.h>
37 #include <gnuradio/blocks/pdu.h>
38 #include <boost/format.hpp>
39 
40 static const long timeout_us = 100 * 1000; // 100ms
41 
42 namespace gr {
43 namespace blocks {
44 
stream_pdu_base(int MTU)45 stream_pdu_base::stream_pdu_base(int MTU) : d_fd(-1), d_started(false), d_finished(false)
46 {
47     // reserve space for rx buffer
48     d_rxbuf.resize(MTU, 0);
49 }
50 
~stream_pdu_base()51 stream_pdu_base::~stream_pdu_base() { stop_rxthread(); }
52 
start_rxthread(basic_block * blk,pmt::pmt_t port)53 void stream_pdu_base::start_rxthread(basic_block* blk, pmt::pmt_t port)
54 {
55     d_blk = blk;
56     d_port = port;
57     d_thread = gr::thread::thread(boost::bind(&stream_pdu_base::run, this));
58     d_started = true;
59 }
60 
stop_rxthread()61 void stream_pdu_base::stop_rxthread()
62 {
63     d_finished = true;
64 
65     if (d_started) {
66         d_thread.interrupt();
67         d_thread.join();
68     }
69 }
70 
run()71 void stream_pdu_base::run()
72 {
73     while (!d_finished) {
74         if (!wait_ready())
75             continue;
76 
77         const int result = read(d_fd, &d_rxbuf[0], d_rxbuf.size());
78         if (result <= 0)
79             throw std::runtime_error("stream_pdu_base, bad socket read!");
80 
81         pmt::pmt_t vector = pmt::init_u8vector(result, &d_rxbuf[0]);
82         pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
83 
84         d_blk->message_port_pub(d_port, pdu);
85     }
86 }
87 
wait_ready()88 bool stream_pdu_base::wait_ready()
89 {
90     // setup timeval for timeout
91     timeval tv;
92     tv.tv_sec = 0;
93     tv.tv_usec = timeout_us;
94 
95     // setup rset for timeout
96     fd_set rset;
97     FD_ZERO(&rset);
98     FD_SET(d_fd, &rset);
99 
100     // call select with timeout on receive socket
101     return ::select(d_fd + 1, &rset, NULL, NULL, &tv) > 0;
102 }
103 
send(pmt::pmt_t msg)104 void stream_pdu_base::send(pmt::pmt_t msg)
105 {
106     pmt::pmt_t vector = pmt::cdr(msg);
107     size_t offset(0);
108     size_t itemsize(pdu::itemsize(pdu::type_from_pmt(vector)));
109     int len(pmt::length(vector) * itemsize);
110 
111     const int rv = write(d_fd, pmt::uniform_vector_elements(vector, offset), len);
112     if (rv != len) {
113         std::cerr << boost::format("WARNING: stream_pdu_base::send(pdu) write failed! "
114                                    "(d_fd=%d, len=%d, rv=%d)") %
115                          d_fd % len % rv
116                   << std::endl;
117     }
118 }
119 
120 } /* namespace blocks */
121 } /* namespace gr */
122