1 /* -*- c++ -*- */
2 /*
3 * Copyright 2012,2018 Free Software Foundation, Inc.
4 *
5 * This file is part of GNU Radio
6 *
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3, or (at your option)
10 * any later version.
11 *
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Radio; see the file COPYING. If not, write to
19 * the Free Software Foundation, Inc., 51 Franklin Street,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include "file_meta_sink_impl.h"
28 #include <gnuradio/io_signature.h>
29 #include <fcntl.h>
30 #include <stdio.h>
31 #include <sys/stat.h>
32 #include <sys/types.h>
33 #include <cstdio>
34 #include <stdexcept>
35
36 // win32 (mingw/msvc) specific
37 #ifdef HAVE_IO_H
38 #include <io.h>
39 #endif
40 #ifdef O_BINARY
41 #define OUR_O_BINARY O_BINARY
42 #else
43 #define OUR_O_BINARY 0
44 #endif
45
46 // should be handled via configure
47 #ifdef O_LARGEFILE
48 #define OUR_O_LARGEFILE O_LARGEFILE
49 #else
50 #define OUR_O_LARGEFILE 0
51 #endif
52
53
54 namespace gr {
55 namespace blocks {
56
57
make(size_t itemsize,const std::string & filename,double samp_rate,double relative_rate,gr_file_types type,bool complex,size_t max_segment_size,pmt::pmt_t extra_dict,bool detached_header)58 file_meta_sink::sptr file_meta_sink::make(size_t itemsize,
59 const std::string& filename,
60 double samp_rate,
61 double relative_rate,
62 gr_file_types type,
63 bool complex,
64 size_t max_segment_size,
65 pmt::pmt_t extra_dict,
66 bool detached_header)
67 {
68 return gnuradio::get_initial_sptr(new file_meta_sink_impl(itemsize,
69 filename,
70 samp_rate,
71 relative_rate,
72 type,
73 complex,
74 max_segment_size,
75 extra_dict,
76 detached_header));
77 }
78
file_meta_sink_impl(size_t itemsize,const std::string & filename,double samp_rate,double relative_rate,gr_file_types type,bool complex,size_t max_segment_size,pmt::pmt_t extra_dict,bool detached_header)79 file_meta_sink_impl::file_meta_sink_impl(size_t itemsize,
80 const std::string& filename,
81 double samp_rate,
82 double relative_rate,
83 gr_file_types type,
84 bool complex,
85 size_t max_segment_size,
86 pmt::pmt_t extra_dict,
87 bool detached_header)
88 : sync_block("file_meta_sink",
89 io_signature::make(1, 1, itemsize),
90 io_signature::make(0, 0, 0)),
91 d_itemsize(itemsize),
92 d_samp_rate(samp_rate),
93 d_relative_rate(relative_rate),
94 d_max_seg_size(max_segment_size),
95 d_total_seg_size(0),
96 d_updated(false),
97 d_unbuffered(false)
98 {
99 d_fp = 0;
100 d_new_fp = 0;
101 d_hdr_fp = 0;
102 d_new_hdr_fp = 0;
103
104 if (detached_header == true)
105 d_state = STATE_DETACHED;
106 else
107 d_state = STATE_INLINE;
108
109 if (!open(filename))
110 throw std::runtime_error("file_meta_sink: can't open file\n");
111
112 pmt::pmt_t timestamp = pmt::make_tuple(pmt::from_uint64(0), pmt::from_double(0));
113
114 // handle extra dictionary
115 d_extra = pmt::make_dict();
116 pmt::pmt_t keys = pmt::dict_keys(extra_dict);
117 pmt::pmt_t vals = pmt::dict_values(extra_dict);
118 size_t nitems = pmt::length(keys);
119 for (size_t i = 0; i < nitems; i++) {
120 d_extra = pmt::dict_add(d_extra, pmt::nth(i, keys), pmt::nth(i, vals));
121 }
122
123 d_extra_size = pmt::serialize_str(d_extra).size();
124
125 d_header = pmt::make_dict();
126 d_header = pmt::dict_add(d_header, pmt::mp("version"), pmt::mp(METADATA_VERSION));
127 d_header = pmt::dict_add(d_header, pmt::mp("rx_rate"), pmt::mp(samp_rate));
128 d_header = pmt::dict_add(d_header, pmt::mp("rx_time"), timestamp);
129 d_header = pmt::dict_add(d_header, pmt::mp("size"), pmt::from_long(d_itemsize));
130 d_header = pmt::dict_add(d_header, pmt::mp("type"), pmt::from_long(type));
131 d_header =
132 pmt::dict_add(d_header, pmt::mp("cplx"), complex ? pmt::PMT_T : pmt::PMT_F);
133 d_header = pmt::dict_add(
134 d_header, pmt::mp("strt"), pmt::from_uint64(METADATA_HEADER_SIZE + d_extra_size));
135 d_header = pmt::dict_add(d_header, mp("bytes"), pmt::from_uint64(0));
136
137 do_update();
138
139 if (d_state == STATE_DETACHED)
140 write_header(d_hdr_fp, d_header, d_extra);
141 else
142 write_header(d_fp, d_header, d_extra);
143 }
144
~file_meta_sink_impl()145 file_meta_sink_impl::~file_meta_sink_impl() { close(); }
146
open(const std::string & filename)147 bool file_meta_sink_impl::open(const std::string& filename)
148 {
149 bool ret = true;
150 if (d_state == STATE_DETACHED) {
151 std::string s = filename + ".hdr";
152 ret = _open(&d_new_hdr_fp, s.c_str());
153 }
154
155 ret = ret && _open(&d_new_fp, filename.c_str());
156 d_updated = true;
157 return ret;
158 }
159
_open(FILE ** fp,const char * filename)160 bool file_meta_sink_impl::_open(FILE** fp, const char* filename)
161 {
162 gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this function
163
164 bool ret = true;
165 int fd;
166
167 if ((fd = ::open(filename,
168 O_WRONLY | O_CREAT | O_TRUNC | OUR_O_LARGEFILE | OUR_O_BINARY,
169 0664)) < 0) {
170 perror(filename);
171 return false;
172 }
173
174 if (*fp) { // if we've already got a new one open, close it
175 fclose(*fp);
176 fp = 0;
177 }
178
179 if ((*fp = fdopen(fd, "wb")) == NULL) {
180 perror(filename);
181 ::close(fd); // don't leak file descriptor if fdopen fails.
182 }
183
184 ret = fp != 0;
185
186 return ret;
187 }
188
close()189 void file_meta_sink_impl::close()
190 {
191 gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this function
192 update_last_header();
193
194 if (d_state == STATE_DETACHED) {
195 if (d_new_hdr_fp) {
196 fclose(d_new_hdr_fp);
197 d_new_hdr_fp = 0;
198 }
199 }
200
201 if (d_new_fp) {
202 fclose(d_new_fp);
203 d_new_fp = 0;
204 }
205 d_updated = true;
206
207 if (d_fp) {
208 fclose(d_fp);
209 d_fp = 0;
210 }
211
212 if (d_state == STATE_DETACHED) {
213 if (d_hdr_fp) {
214 fclose(d_hdr_fp);
215 d_hdr_fp = 0;
216 }
217 }
218 }
219
do_update()220 void file_meta_sink_impl::do_update()
221 {
222 if (d_updated) {
223 gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this block
224 if (d_state == STATE_DETACHED) {
225 if (d_hdr_fp)
226 fclose(d_hdr_fp);
227 d_hdr_fp = d_new_hdr_fp; // install new file pointer
228 d_new_hdr_fp = 0;
229 }
230
231 if (d_fp)
232 fclose(d_fp);
233 d_fp = d_new_fp; // install new file pointer
234 d_new_fp = 0;
235
236 d_updated = false;
237 }
238 }
239
write_header(FILE * fp,pmt::pmt_t header,pmt::pmt_t extra)240 void file_meta_sink_impl::write_header(FILE* fp, pmt::pmt_t header, pmt::pmt_t extra)
241 {
242 std::string header_str = pmt::serialize_str(header);
243 std::string extra_str = pmt::serialize_str(extra);
244
245 if ((header_str.size() != METADATA_HEADER_SIZE) || (extra_str.size() != d_extra_size))
246 throw std::runtime_error("file_meta_sink: header or extra_dict is wrong size.\n");
247
248 size_t nwritten = 0;
249 while (nwritten < header_str.size()) {
250 std::string sub = header_str.substr(nwritten);
251 int count = fwrite(sub.c_str(), sizeof(char), sub.size(), fp);
252 nwritten += count;
253 if ((count == 0) && (ferror(fp))) {
254 fclose(fp);
255 throw std::runtime_error("file_meta_sink: error writing header to file.\n");
256 }
257 }
258
259 nwritten = 0;
260 while (nwritten < extra_str.size()) {
261 std::string sub = extra_str.substr(nwritten);
262 int count = fwrite(sub.c_str(), sizeof(char), sub.size(), fp);
263 nwritten += count;
264 if ((count == 0) && (ferror(fp))) {
265 fclose(fp);
266 throw std::runtime_error("file_meta_sink: error writing extra to file.\n");
267 }
268 }
269
270 fflush(fp);
271 }
272
update_header(pmt::pmt_t key,pmt::pmt_t value)273 void file_meta_sink_impl::update_header(pmt::pmt_t key, pmt::pmt_t value)
274 {
275 // Special handling caveat to transform rate from radio source into
276 // the rate at this sink.
277 if (pmt::eq(key, mp("rx_rate"))) {
278 d_samp_rate = pmt::to_double(value);
279 value = pmt::from_double(d_samp_rate * d_relative_rate);
280 }
281
282 // If the tag is not part of the standard header, we put it into the
283 // extra data, which either updates the current dictionary or adds a
284 // new item.
285 if (pmt::dict_has_key(d_header, key)) {
286 d_header = pmt::dict_add(d_header, key, value);
287 } else {
288 d_extra = pmt::dict_add(d_extra, key, value);
289 d_extra_size = pmt::serialize_str(d_extra).size();
290 }
291 }
292
update_last_header()293 void file_meta_sink_impl::update_last_header()
294 {
295 if (d_state == STATE_DETACHED) {
296 if (d_hdr_fp)
297 update_last_header_detached();
298 } else {
299 if (d_fp)
300 update_last_header_inline();
301 }
302 }
303
update_last_header_inline()304 void file_meta_sink_impl::update_last_header_inline()
305 {
306 // Update the last header info with the number of samples this
307 // block represents.
308
309 size_t hdrlen = pmt::to_uint64(pmt::dict_ref(d_header, mp("strt"), pmt::PMT_NIL));
310 size_t seg_size = d_itemsize * d_total_seg_size;
311 pmt::pmt_t s = pmt::from_uint64(seg_size);
312 update_header(mp("bytes"), s);
313 update_header(mp("strt"), pmt::from_uint64(METADATA_HEADER_SIZE + d_extra_size));
314 if (fseek(d_fp, -seg_size - hdrlen, SEEK_CUR) == -1) {
315 throw std::runtime_error("fseek() failed.");
316 }
317 write_header(d_fp, d_header, d_extra);
318 if (fseek(d_fp, seg_size, SEEK_CUR) == -1) {
319 throw std::runtime_error("fseek() failed.");
320 }
321 }
322
update_last_header_detached()323 void file_meta_sink_impl::update_last_header_detached()
324 {
325 // Update the last header info with the number of samples this
326 // block represents.
327 size_t hdrlen = pmt::to_uint64(pmt::dict_ref(d_header, mp("strt"), pmt::PMT_NIL));
328 size_t seg_size = d_itemsize * d_total_seg_size;
329 pmt::pmt_t s = pmt::from_uint64(seg_size);
330 update_header(mp("bytes"), s);
331 update_header(mp("strt"), pmt::from_uint64(METADATA_HEADER_SIZE + d_extra_size));
332 if (fseek(d_hdr_fp, -hdrlen, SEEK_CUR) == -1) {
333 throw std::runtime_error("fseek() failed.");
334 }
335 write_header(d_hdr_fp, d_header, d_extra);
336 }
337
write_and_update()338 void file_meta_sink_impl::write_and_update()
339 {
340 // New header, so set current size of chunk to 0 and start of chunk
341 // based on current index + header size.
342 // uint64_t loc = get_last_header_loc();
343 pmt::pmt_t s = pmt::from_uint64(0);
344 update_header(mp("bytes"), s);
345
346 // If we have multiple tags on the same offset, this makes
347 // sure we just overwrite the same header each time instead
348 // of creating a new header per tag.
349 s = pmt::from_uint64(METADATA_HEADER_SIZE + d_extra_size);
350 update_header(mp("strt"), s);
351
352 if (d_state == STATE_DETACHED)
353 write_header(d_hdr_fp, d_header, d_extra);
354 else
355 write_header(d_fp, d_header, d_extra);
356 }
357
update_rx_time()358 void file_meta_sink_impl::update_rx_time()
359 {
360 pmt::pmt_t rx_time = pmt::string_to_symbol("rx_time");
361 pmt::pmt_t r = pmt::dict_ref(d_header, rx_time, pmt::PMT_NIL);
362 uint64_t secs = pmt::to_uint64(pmt::tuple_ref(r, 0));
363 double fracs = pmt::to_double(pmt::tuple_ref(r, 1));
364 double diff = d_total_seg_size / (d_samp_rate * d_relative_rate);
365
366 // std::cerr << "old secs: " << secs << std::endl;
367 // std::cerr << "old fracs: " << fracs << std::endl;
368 // std::cerr << "seg size: " << d_total_seg_size << std::endl;
369 // std::cerr << "diff: " << diff << std::endl;
370
371 fracs += diff;
372 uint64_t new_secs = static_cast<uint64_t>(fracs);
373 secs += new_secs;
374 fracs -= new_secs;
375
376 // std::cerr << "new secs: " << secs << std::endl;
377 // std::cerr << "new fracs: " << fracs << std::endl << std::endl;
378
379 r = pmt::make_tuple(pmt::from_uint64(secs), pmt::from_double(fracs));
380 d_header = pmt::dict_add(d_header, rx_time, r);
381 }
382
work(int noutput_items,gr_vector_const_void_star & input_items,gr_vector_void_star & output_items)383 int file_meta_sink_impl::work(int noutput_items,
384 gr_vector_const_void_star& input_items,
385 gr_vector_void_star& output_items)
386 {
387 char* inbuf = (char*)input_items[0];
388 int nwritten = 0;
389
390 do_update(); // update d_fp is reqd
391
392 if (!d_fp)
393 return noutput_items; // drop output on the floor
394
395 uint64_t abs_N = nitems_read(0);
396 uint64_t end_N = abs_N + (uint64_t)(noutput_items);
397 std::vector<tag_t> all_tags;
398 get_tags_in_range(all_tags, 0, abs_N, end_N);
399
400 std::vector<tag_t>::iterator itr;
401 for (itr = all_tags.begin(); itr != all_tags.end(); itr++) {
402 int item_offset = (int)(itr->offset - abs_N);
403
404 // Write date to file up to the next tag location
405 while (nwritten < item_offset) {
406 size_t towrite = std::min(d_max_seg_size - d_total_seg_size,
407 (size_t)(item_offset - nwritten));
408 int count = fwrite(inbuf, d_itemsize, towrite, d_fp);
409 if (count == 0) // FIXME add error handling
410 break;
411 nwritten += count;
412 inbuf += count * d_itemsize;
413
414 d_total_seg_size += count;
415
416 // Only add a new header if we are not at the position of the
417 // next tag
418 if ((d_total_seg_size == d_max_seg_size) && (nwritten < item_offset)) {
419 update_last_header();
420 update_rx_time();
421 write_and_update();
422 d_total_seg_size = 0;
423 }
424 }
425
426 if (d_total_seg_size > 0) {
427 update_last_header();
428 update_header(itr->key, itr->value);
429 write_and_update();
430 d_total_seg_size = 0;
431 } else {
432 update_header(itr->key, itr->value);
433 update_last_header();
434 }
435 }
436
437 // Finish up the rest of the data after tags
438 while (nwritten < noutput_items) {
439 size_t towrite = std::min(d_max_seg_size - d_total_seg_size,
440 (size_t)(noutput_items - nwritten));
441 int count = fwrite(inbuf, d_itemsize, towrite, d_fp);
442 if (count == 0) // FIXME add error handling
443 break;
444 nwritten += count;
445 inbuf += count * d_itemsize;
446
447 d_total_seg_size += count;
448 if (d_total_seg_size == d_max_seg_size) {
449 update_last_header();
450 update_rx_time();
451 write_and_update();
452 d_total_seg_size = 0;
453 }
454 }
455
456 if (d_unbuffered)
457 fflush(d_fp);
458
459 return nwritten;
460 }
461
462 } /* namespace blocks */
463 } /* namespace gr */
464