1 /* -*- c++ -*- */
2 /*
3  * Copyright 2012,2018 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 
27 #include "file_meta_sink_impl.h"
28 #include <gnuradio/io_signature.h>
29 #include <fcntl.h>
30 #include <stdio.h>
31 #include <sys/stat.h>
32 #include <sys/types.h>
33 #include <cstdio>
34 #include <stdexcept>
35 
36 // win32 (mingw/msvc) specific
37 #ifdef HAVE_IO_H
38 #include <io.h>
39 #endif
40 #ifdef O_BINARY
41 #define OUR_O_BINARY O_BINARY
42 #else
43 #define OUR_O_BINARY 0
44 #endif
45 
46 // should be handled via configure
47 #ifdef O_LARGEFILE
48 #define OUR_O_LARGEFILE O_LARGEFILE
49 #else
50 #define OUR_O_LARGEFILE 0
51 #endif
52 
53 
54 namespace gr {
55 namespace blocks {
56 
57 
make(size_t itemsize,const std::string & filename,double samp_rate,double relative_rate,gr_file_types type,bool complex,size_t max_segment_size,pmt::pmt_t extra_dict,bool detached_header)58 file_meta_sink::sptr file_meta_sink::make(size_t itemsize,
59                                           const std::string& filename,
60                                           double samp_rate,
61                                           double relative_rate,
62                                           gr_file_types type,
63                                           bool complex,
64                                           size_t max_segment_size,
65                                           pmt::pmt_t extra_dict,
66                                           bool detached_header)
67 {
68     return gnuradio::get_initial_sptr(new file_meta_sink_impl(itemsize,
69                                                               filename,
70                                                               samp_rate,
71                                                               relative_rate,
72                                                               type,
73                                                               complex,
74                                                               max_segment_size,
75                                                               extra_dict,
76                                                               detached_header));
77 }
78 
file_meta_sink_impl(size_t itemsize,const std::string & filename,double samp_rate,double relative_rate,gr_file_types type,bool complex,size_t max_segment_size,pmt::pmt_t extra_dict,bool detached_header)79 file_meta_sink_impl::file_meta_sink_impl(size_t itemsize,
80                                          const std::string& filename,
81                                          double samp_rate,
82                                          double relative_rate,
83                                          gr_file_types type,
84                                          bool complex,
85                                          size_t max_segment_size,
86                                          pmt::pmt_t extra_dict,
87                                          bool detached_header)
88     : sync_block("file_meta_sink",
89                  io_signature::make(1, 1, itemsize),
90                  io_signature::make(0, 0, 0)),
91       d_itemsize(itemsize),
92       d_samp_rate(samp_rate),
93       d_relative_rate(relative_rate),
94       d_max_seg_size(max_segment_size),
95       d_total_seg_size(0),
96       d_updated(false),
97       d_unbuffered(false)
98 {
99     d_fp = 0;
100     d_new_fp = 0;
101     d_hdr_fp = 0;
102     d_new_hdr_fp = 0;
103 
104     if (detached_header == true)
105         d_state = STATE_DETACHED;
106     else
107         d_state = STATE_INLINE;
108 
109     if (!open(filename))
110         throw std::runtime_error("file_meta_sink: can't open file\n");
111 
112     pmt::pmt_t timestamp = pmt::make_tuple(pmt::from_uint64(0), pmt::from_double(0));
113 
114     // handle extra dictionary
115     d_extra = pmt::make_dict();
116     pmt::pmt_t keys = pmt::dict_keys(extra_dict);
117     pmt::pmt_t vals = pmt::dict_values(extra_dict);
118     size_t nitems = pmt::length(keys);
119     for (size_t i = 0; i < nitems; i++) {
120         d_extra = pmt::dict_add(d_extra, pmt::nth(i, keys), pmt::nth(i, vals));
121     }
122 
123     d_extra_size = pmt::serialize_str(d_extra).size();
124 
125     d_header = pmt::make_dict();
126     d_header = pmt::dict_add(d_header, pmt::mp("version"), pmt::mp(METADATA_VERSION));
127     d_header = pmt::dict_add(d_header, pmt::mp("rx_rate"), pmt::mp(samp_rate));
128     d_header = pmt::dict_add(d_header, pmt::mp("rx_time"), timestamp);
129     d_header = pmt::dict_add(d_header, pmt::mp("size"), pmt::from_long(d_itemsize));
130     d_header = pmt::dict_add(d_header, pmt::mp("type"), pmt::from_long(type));
131     d_header =
132         pmt::dict_add(d_header, pmt::mp("cplx"), complex ? pmt::PMT_T : pmt::PMT_F);
133     d_header = pmt::dict_add(
134         d_header, pmt::mp("strt"), pmt::from_uint64(METADATA_HEADER_SIZE + d_extra_size));
135     d_header = pmt::dict_add(d_header, mp("bytes"), pmt::from_uint64(0));
136 
137     do_update();
138 
139     if (d_state == STATE_DETACHED)
140         write_header(d_hdr_fp, d_header, d_extra);
141     else
142         write_header(d_fp, d_header, d_extra);
143 }
144 
~file_meta_sink_impl()145 file_meta_sink_impl::~file_meta_sink_impl() { close(); }
146 
open(const std::string & filename)147 bool file_meta_sink_impl::open(const std::string& filename)
148 {
149     bool ret = true;
150     if (d_state == STATE_DETACHED) {
151         std::string s = filename + ".hdr";
152         ret = _open(&d_new_hdr_fp, s.c_str());
153     }
154 
155     ret = ret && _open(&d_new_fp, filename.c_str());
156     d_updated = true;
157     return ret;
158 }
159 
_open(FILE ** fp,const char * filename)160 bool file_meta_sink_impl::_open(FILE** fp, const char* filename)
161 {
162     gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this function
163 
164     bool ret = true;
165     int fd;
166 
167     if ((fd = ::open(filename,
168                      O_WRONLY | O_CREAT | O_TRUNC | OUR_O_LARGEFILE | OUR_O_BINARY,
169                      0664)) < 0) {
170         perror(filename);
171         return false;
172     }
173 
174     if (*fp) { // if we've already got a new one open, close it
175         fclose(*fp);
176         fp = 0;
177     }
178 
179     if ((*fp = fdopen(fd, "wb")) == NULL) {
180         perror(filename);
181         ::close(fd); // don't leak file descriptor if fdopen fails.
182     }
183 
184     ret = fp != 0;
185 
186     return ret;
187 }
188 
close()189 void file_meta_sink_impl::close()
190 {
191     gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this function
192     update_last_header();
193 
194     if (d_state == STATE_DETACHED) {
195         if (d_new_hdr_fp) {
196             fclose(d_new_hdr_fp);
197             d_new_hdr_fp = 0;
198         }
199     }
200 
201     if (d_new_fp) {
202         fclose(d_new_fp);
203         d_new_fp = 0;
204     }
205     d_updated = true;
206 
207     if (d_fp) {
208         fclose(d_fp);
209         d_fp = 0;
210     }
211 
212     if (d_state == STATE_DETACHED) {
213         if (d_hdr_fp) {
214             fclose(d_hdr_fp);
215             d_hdr_fp = 0;
216         }
217     }
218 }
219 
do_update()220 void file_meta_sink_impl::do_update()
221 {
222     if (d_updated) {
223         gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this block
224         if (d_state == STATE_DETACHED) {
225             if (d_hdr_fp)
226                 fclose(d_hdr_fp);
227             d_hdr_fp = d_new_hdr_fp; // install new file pointer
228             d_new_hdr_fp = 0;
229         }
230 
231         if (d_fp)
232             fclose(d_fp);
233         d_fp = d_new_fp; // install new file pointer
234         d_new_fp = 0;
235 
236         d_updated = false;
237     }
238 }
239 
write_header(FILE * fp,pmt::pmt_t header,pmt::pmt_t extra)240 void file_meta_sink_impl::write_header(FILE* fp, pmt::pmt_t header, pmt::pmt_t extra)
241 {
242     std::string header_str = pmt::serialize_str(header);
243     std::string extra_str = pmt::serialize_str(extra);
244 
245     if ((header_str.size() != METADATA_HEADER_SIZE) || (extra_str.size() != d_extra_size))
246         throw std::runtime_error("file_meta_sink: header or extra_dict is wrong size.\n");
247 
248     size_t nwritten = 0;
249     while (nwritten < header_str.size()) {
250         std::string sub = header_str.substr(nwritten);
251         int count = fwrite(sub.c_str(), sizeof(char), sub.size(), fp);
252         nwritten += count;
253         if ((count == 0) && (ferror(fp))) {
254             fclose(fp);
255             throw std::runtime_error("file_meta_sink: error writing header to file.\n");
256         }
257     }
258 
259     nwritten = 0;
260     while (nwritten < extra_str.size()) {
261         std::string sub = extra_str.substr(nwritten);
262         int count = fwrite(sub.c_str(), sizeof(char), sub.size(), fp);
263         nwritten += count;
264         if ((count == 0) && (ferror(fp))) {
265             fclose(fp);
266             throw std::runtime_error("file_meta_sink: error writing extra to file.\n");
267         }
268     }
269 
270     fflush(fp);
271 }
272 
update_header(pmt::pmt_t key,pmt::pmt_t value)273 void file_meta_sink_impl::update_header(pmt::pmt_t key, pmt::pmt_t value)
274 {
275     // Special handling caveat to transform rate from radio source into
276     // the rate at this sink.
277     if (pmt::eq(key, mp("rx_rate"))) {
278         d_samp_rate = pmt::to_double(value);
279         value = pmt::from_double(d_samp_rate * d_relative_rate);
280     }
281 
282     // If the tag is not part of the standard header, we put it into the
283     // extra data, which either updates the current dictionary or adds a
284     // new item.
285     if (pmt::dict_has_key(d_header, key)) {
286         d_header = pmt::dict_add(d_header, key, value);
287     } else {
288         d_extra = pmt::dict_add(d_extra, key, value);
289         d_extra_size = pmt::serialize_str(d_extra).size();
290     }
291 }
292 
update_last_header()293 void file_meta_sink_impl::update_last_header()
294 {
295     if (d_state == STATE_DETACHED) {
296         if (d_hdr_fp)
297             update_last_header_detached();
298     } else {
299         if (d_fp)
300             update_last_header_inline();
301     }
302 }
303 
update_last_header_inline()304 void file_meta_sink_impl::update_last_header_inline()
305 {
306     // Update the last header info with the number of samples this
307     // block represents.
308 
309     size_t hdrlen = pmt::to_uint64(pmt::dict_ref(d_header, mp("strt"), pmt::PMT_NIL));
310     size_t seg_size = d_itemsize * d_total_seg_size;
311     pmt::pmt_t s = pmt::from_uint64(seg_size);
312     update_header(mp("bytes"), s);
313     update_header(mp("strt"), pmt::from_uint64(METADATA_HEADER_SIZE + d_extra_size));
314     if (fseek(d_fp, -seg_size - hdrlen, SEEK_CUR) == -1) {
315         throw std::runtime_error("fseek() failed.");
316     }
317     write_header(d_fp, d_header, d_extra);
318     if (fseek(d_fp, seg_size, SEEK_CUR) == -1) {
319         throw std::runtime_error("fseek() failed.");
320     }
321 }
322 
update_last_header_detached()323 void file_meta_sink_impl::update_last_header_detached()
324 {
325     // Update the last header info with the number of samples this
326     // block represents.
327     size_t hdrlen = pmt::to_uint64(pmt::dict_ref(d_header, mp("strt"), pmt::PMT_NIL));
328     size_t seg_size = d_itemsize * d_total_seg_size;
329     pmt::pmt_t s = pmt::from_uint64(seg_size);
330     update_header(mp("bytes"), s);
331     update_header(mp("strt"), pmt::from_uint64(METADATA_HEADER_SIZE + d_extra_size));
332     if (fseek(d_hdr_fp, -hdrlen, SEEK_CUR) == -1) {
333         throw std::runtime_error("fseek() failed.");
334     }
335     write_header(d_hdr_fp, d_header, d_extra);
336 }
337 
write_and_update()338 void file_meta_sink_impl::write_and_update()
339 {
340     // New header, so set current size of chunk to 0 and start of chunk
341     // based on current index + header size.
342     // uint64_t loc = get_last_header_loc();
343     pmt::pmt_t s = pmt::from_uint64(0);
344     update_header(mp("bytes"), s);
345 
346     // If we have multiple tags on the same offset, this makes
347     // sure we just overwrite the same header each time instead
348     // of creating a new header per tag.
349     s = pmt::from_uint64(METADATA_HEADER_SIZE + d_extra_size);
350     update_header(mp("strt"), s);
351 
352     if (d_state == STATE_DETACHED)
353         write_header(d_hdr_fp, d_header, d_extra);
354     else
355         write_header(d_fp, d_header, d_extra);
356 }
357 
update_rx_time()358 void file_meta_sink_impl::update_rx_time()
359 {
360     pmt::pmt_t rx_time = pmt::string_to_symbol("rx_time");
361     pmt::pmt_t r = pmt::dict_ref(d_header, rx_time, pmt::PMT_NIL);
362     uint64_t secs = pmt::to_uint64(pmt::tuple_ref(r, 0));
363     double fracs = pmt::to_double(pmt::tuple_ref(r, 1));
364     double diff = d_total_seg_size / (d_samp_rate * d_relative_rate);
365 
366     // std::cerr << "old secs:  " << secs << std::endl;
367     // std::cerr << "old fracs: " << fracs << std::endl;
368     // std::cerr << "seg size:  " << d_total_seg_size << std::endl;
369     // std::cerr << "diff:      " << diff << std::endl;
370 
371     fracs += diff;
372     uint64_t new_secs = static_cast<uint64_t>(fracs);
373     secs += new_secs;
374     fracs -= new_secs;
375 
376     // std::cerr << "new secs:  " << secs << std::endl;
377     // std::cerr << "new fracs: " << fracs << std::endl << std::endl;
378 
379     r = pmt::make_tuple(pmt::from_uint64(secs), pmt::from_double(fracs));
380     d_header = pmt::dict_add(d_header, rx_time, r);
381 }
382 
work(int noutput_items,gr_vector_const_void_star & input_items,gr_vector_void_star & output_items)383 int file_meta_sink_impl::work(int noutput_items,
384                               gr_vector_const_void_star& input_items,
385                               gr_vector_void_star& output_items)
386 {
387     char* inbuf = (char*)input_items[0];
388     int nwritten = 0;
389 
390     do_update(); // update d_fp is reqd
391 
392     if (!d_fp)
393         return noutput_items; // drop output on the floor
394 
395     uint64_t abs_N = nitems_read(0);
396     uint64_t end_N = abs_N + (uint64_t)(noutput_items);
397     std::vector<tag_t> all_tags;
398     get_tags_in_range(all_tags, 0, abs_N, end_N);
399 
400     std::vector<tag_t>::iterator itr;
401     for (itr = all_tags.begin(); itr != all_tags.end(); itr++) {
402         int item_offset = (int)(itr->offset - abs_N);
403 
404         // Write date to file up to the next tag location
405         while (nwritten < item_offset) {
406             size_t towrite = std::min(d_max_seg_size - d_total_seg_size,
407                                       (size_t)(item_offset - nwritten));
408             int count = fwrite(inbuf, d_itemsize, towrite, d_fp);
409             if (count == 0) // FIXME add error handling
410                 break;
411             nwritten += count;
412             inbuf += count * d_itemsize;
413 
414             d_total_seg_size += count;
415 
416             // Only add a new header if we are not at the position of the
417             // next tag
418             if ((d_total_seg_size == d_max_seg_size) && (nwritten < item_offset)) {
419                 update_last_header();
420                 update_rx_time();
421                 write_and_update();
422                 d_total_seg_size = 0;
423             }
424         }
425 
426         if (d_total_seg_size > 0) {
427             update_last_header();
428             update_header(itr->key, itr->value);
429             write_and_update();
430             d_total_seg_size = 0;
431         } else {
432             update_header(itr->key, itr->value);
433             update_last_header();
434         }
435     }
436 
437     // Finish up the rest of the data after tags
438     while (nwritten < noutput_items) {
439         size_t towrite = std::min(d_max_seg_size - d_total_seg_size,
440                                   (size_t)(noutput_items - nwritten));
441         int count = fwrite(inbuf, d_itemsize, towrite, d_fp);
442         if (count == 0) // FIXME add error handling
443             break;
444         nwritten += count;
445         inbuf += count * d_itemsize;
446 
447         d_total_seg_size += count;
448         if (d_total_seg_size == d_max_seg_size) {
449             update_last_header();
450             update_rx_time();
451             write_and_update();
452             d_total_seg_size = 0;
453         }
454     }
455 
456     if (d_unbuffered)
457         fflush(d_fp);
458 
459     return nwritten;
460 }
461 
462 } /* namespace blocks */
463 } /* namespace gr */
464