1 /* -*- c++ -*- */
2 /*
3  * Copyright 2013 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 
27 #include "pdu_to_tagged_stream_impl.h"
28 #include <gnuradio/blocks/pdu.h>
29 #include <gnuradio/io_signature.h>
30 
31 namespace gr {
32 namespace blocks {
33 
make(pdu::vector_type type,const std::string & tsb_tag_key)34 pdu_to_tagged_stream::sptr pdu_to_tagged_stream::make(pdu::vector_type type,
35                                                       const std::string& tsb_tag_key)
36 {
37     return gnuradio::get_initial_sptr(new pdu_to_tagged_stream_impl(type, tsb_tag_key));
38 }
39 
pdu_to_tagged_stream_impl(pdu::vector_type type,const std::string & tsb_tag_key)40 pdu_to_tagged_stream_impl::pdu_to_tagged_stream_impl(pdu::vector_type type,
41                                                      const std::string& tsb_tag_key)
42     : tagged_stream_block("pdu_to_tagged_stream",
43                           io_signature::make(0, 0, 0),
44                           io_signature::make(1, 1, pdu::itemsize(type)),
45                           tsb_tag_key),
46       d_itemsize(pdu::itemsize(type)),
47       d_curr_len(0)
48 {
49     message_port_register_in(pdu::pdu_port_id());
50 }
51 
calculate_output_stream_length(const gr_vector_int &)52 int pdu_to_tagged_stream_impl::calculate_output_stream_length(const gr_vector_int&)
53 {
54     if (d_curr_len == 0) {
55         pmt::pmt_t msg(delete_head_nowait(pdu::pdu_port_id()));
56         if (msg.get() == NULL) {
57             return 0;
58         }
59 
60         if (!pmt::is_pair(msg))
61             throw std::runtime_error("received a malformed pdu message");
62 
63         d_curr_meta = pmt::car(msg);
64         d_curr_vect = pmt::cdr(msg);
65         // do not assume the length of  PMT is in items (e.g.: from socket_pdu)
66         d_curr_len = pmt::blob_length(d_curr_vect) / d_itemsize;
67     }
68 
69     return d_curr_len;
70 }
71 
work(int noutput_items,gr_vector_int & ninput_items,gr_vector_const_void_star & input_items,gr_vector_void_star & output_items)72 int pdu_to_tagged_stream_impl::work(int noutput_items,
73                                     gr_vector_int& ninput_items,
74                                     gr_vector_const_void_star& input_items,
75                                     gr_vector_void_star& output_items)
76 {
77     uint8_t* out = (uint8_t*)output_items[0];
78 
79     if (d_curr_len == 0) {
80         return 0;
81     }
82 
83     // work() should only be called if the current PDU fits entirely
84     // into the output buffer.
85     assert(noutput_items >= 0 && (unsigned int)noutput_items >= d_curr_len);
86 
87     // Copy vector output
88     size_t nout = d_curr_len;
89     size_t io(0);
90     const uint8_t* ptr = (const uint8_t*)uniform_vector_elements(d_curr_vect, io);
91     memcpy(out, ptr, d_curr_len * d_itemsize);
92 
93     // Copy tags
94     if (!pmt::eq(d_curr_meta, pmt::PMT_NIL)) {
95         pmt::pmt_t klist(pmt::dict_keys(d_curr_meta));
96         for (size_t i = 0; i < pmt::length(klist); i++) {
97             pmt::pmt_t k(pmt::nth(i, klist));
98             pmt::pmt_t v(pmt::dict_ref(d_curr_meta, k, pmt::PMT_NIL));
99             add_item_tag(0, nitems_written(0), k, v, alias_pmt());
100         }
101     }
102 
103     // Reset state
104     d_curr_len = 0;
105 
106     return nout;
107 } /* work() */
108 
109 } /* namespace blocks */
110 } /* namespace gr */
111