1 /* -*- c++ -*- */
2 /*
3 * Copyright 2013 Free Software Foundation, Inc.
4 *
5 * This file is part of GNU Radio
6 *
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3, or (at your option)
10 * any later version.
11 *
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Radio; see the file COPYING. If not, write to
19 * the Free Software Foundation, Inc., 51 Franklin Street,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include "pdu_to_tagged_stream_impl.h"
28 #include <gnuradio/blocks/pdu.h>
29 #include <gnuradio/io_signature.h>
30
31 namespace gr {
32 namespace blocks {
33
make(pdu::vector_type type,const std::string & tsb_tag_key)34 pdu_to_tagged_stream::sptr pdu_to_tagged_stream::make(pdu::vector_type type,
35 const std::string& tsb_tag_key)
36 {
37 return gnuradio::get_initial_sptr(new pdu_to_tagged_stream_impl(type, tsb_tag_key));
38 }
39
pdu_to_tagged_stream_impl(pdu::vector_type type,const std::string & tsb_tag_key)40 pdu_to_tagged_stream_impl::pdu_to_tagged_stream_impl(pdu::vector_type type,
41 const std::string& tsb_tag_key)
42 : tagged_stream_block("pdu_to_tagged_stream",
43 io_signature::make(0, 0, 0),
44 io_signature::make(1, 1, pdu::itemsize(type)),
45 tsb_tag_key),
46 d_itemsize(pdu::itemsize(type)),
47 d_curr_len(0)
48 {
49 message_port_register_in(pdu::pdu_port_id());
50 }
51
calculate_output_stream_length(const gr_vector_int &)52 int pdu_to_tagged_stream_impl::calculate_output_stream_length(const gr_vector_int&)
53 {
54 if (d_curr_len == 0) {
55 pmt::pmt_t msg(delete_head_nowait(pdu::pdu_port_id()));
56 if (msg.get() == NULL) {
57 return 0;
58 }
59
60 if (!pmt::is_pair(msg))
61 throw std::runtime_error("received a malformed pdu message");
62
63 d_curr_meta = pmt::car(msg);
64 d_curr_vect = pmt::cdr(msg);
65 // do not assume the length of PMT is in items (e.g.: from socket_pdu)
66 d_curr_len = pmt::blob_length(d_curr_vect) / d_itemsize;
67 }
68
69 return d_curr_len;
70 }
71
work(int noutput_items,gr_vector_int & ninput_items,gr_vector_const_void_star & input_items,gr_vector_void_star & output_items)72 int pdu_to_tagged_stream_impl::work(int noutput_items,
73 gr_vector_int& ninput_items,
74 gr_vector_const_void_star& input_items,
75 gr_vector_void_star& output_items)
76 {
77 uint8_t* out = (uint8_t*)output_items[0];
78
79 if (d_curr_len == 0) {
80 return 0;
81 }
82
83 // work() should only be called if the current PDU fits entirely
84 // into the output buffer.
85 assert(noutput_items >= 0 && (unsigned int)noutput_items >= d_curr_len);
86
87 // Copy vector output
88 size_t nout = d_curr_len;
89 size_t io(0);
90 const uint8_t* ptr = (const uint8_t*)uniform_vector_elements(d_curr_vect, io);
91 memcpy(out, ptr, d_curr_len * d_itemsize);
92
93 // Copy tags
94 if (!pmt::eq(d_curr_meta, pmt::PMT_NIL)) {
95 pmt::pmt_t klist(pmt::dict_keys(d_curr_meta));
96 for (size_t i = 0; i < pmt::length(klist); i++) {
97 pmt::pmt_t k(pmt::nth(i, klist));
98 pmt::pmt_t v(pmt::dict_ref(d_curr_meta, k, pmt::PMT_NIL));
99 add_item_tag(0, nitems_written(0), k, v, alias_pmt());
100 }
101 }
102
103 // Reset state
104 d_curr_len = 0;
105
106 return nout;
107 } /* work() */
108
109 } /* namespace blocks */
110 } /* namespace gr */
111