1 #include "grisu3.h"
2 #include <array>
3 #include <future>
4 #include <iterator>
5
6 #include <cpp11/R.hpp>
7 #include <cpp11/function.hpp>
8 #include <cpp11/list.hpp>
9 #include <cpp11/strings.hpp>
10
11 #include "RProgress.h"
12 #include "connection.h"
13 #include "r_utils.h"
14
15 #include "unicode_fopen.h"
16
17 typedef enum {
18 quote_needed = 1,
19 quote_all = 2,
20 escape_double = 4,
21 escape_backslash = 8,
22 bom = 16
23 } vroom_write_opt_t;
24
get_buffer_size(const cpp11::list & input,const std::vector<SEXPTYPE> & types,size_t start,size_t end)25 size_t get_buffer_size(
26 const cpp11::list& input,
27 const std::vector<SEXPTYPE>& types,
28 size_t start,
29 size_t end) {
30
31 // First need to determine how big the buffer(s) should be
32 // - For characters we need the total nchar() + 2 (for quotes if needed)
33 // (they are converted to UTF-8 in R)
34 // - For factors we need max(nchar(levels)) (but currently we just convert to
35 // character in R)
36 // - For decimal numbers we need 24
37 // source: https://stackoverflow.com/a/52045523/2055486
38 // - For 32 bit integers we need 11 (10 for digits plus the sign)
39 // - For logical we need 5 (FALSE)
40 //
41 // - Currently we convert dates, times and datetimes to character before
42 // output. If we wanted to do it in C it would be
43 // - For dates we need 10 (2019-04-12)
44 // - For times we need 8 (01:00:00)
45 // - For datetimes we need 20 (2019-04-12T20:46:31Z)
46
47 size_t buf_size = 0;
48
49 size_t num_rows = end - start;
50
51 for (int i = 0; i < input.size(); ++i) {
52 switch (types[i]) {
53 case STRSXP: {
54 for (size_t j = start; j < end; ++j) {
55 auto sz = Rf_xlength(STRING_ELT(input[i], j));
56 buf_size += sz + 2;
57 }
58 break;
59 }
60 case LGLSXP:
61 buf_size += 5 * num_rows;
62 break;
63 case REALSXP:
64 buf_size += 24 * num_rows;
65 break;
66 case INTSXP:
67 buf_size += 11 * num_rows;
68 break;
69 }
70 }
71
72 // Add size of delimiters + newline
73 buf_size += input.size() * num_rows;
74
75 return buf_size;
76 }
77
needs_quote(const char * str,const char delim,const char * na_str)78 bool needs_quote(const char* str, const char delim, const char* na_str) {
79 if (strncmp(str, na_str, 2) == 0) {
80 return true;
81 }
82
83 for (const char* cur = str; *cur != '\0'; ++cur) {
84 if (*cur == '\n' || *cur == '\r' || *cur == '"' || *cur == delim) {
85 return true;
86 }
87 }
88
89 return false;
90 }
91
92 // adapted from https://stackoverflow.com/a/28110728/2055486
93 template <size_t N>
append_literal(std::vector<char> & buf,const char (& str)[N])94 void append_literal(std::vector<char>& buf, const char (&str)[N]) {
95 std::copy(std::begin(str), std::end(str) - 1, std::back_inserter(buf));
96 }
97
is_utf8(cetype_t ce)98 inline bool is_utf8(cetype_t ce) {
99 switch (ce) {
100 case CE_ANY:
101 case CE_BYTES:
102 case CE_UTF8:
103 return true;
104 default:
105 return false;
106 }
107 }
108
str_to_buf(SEXP str,std::vector<char> & buf,const char delim,const char * na_str,size_t na_len,size_t options)109 void str_to_buf(
110 SEXP str,
111 std::vector<char>& buf,
112 const char delim,
113 const char* na_str,
114 size_t na_len,
115 size_t options) {
116
117 if (str == NA_STRING) {
118 std::copy(na_str, na_str + na_len, std::back_inserter(buf));
119 return;
120 }
121
122 const char* str_p;
123 size_t len;
124 if (is_utf8(Rf_getCharCE(str))) {
125 str_p = CHAR(str);
126 len = Rf_xlength(str);
127 } else {
128 str_p = Rf_translateCharUTF8(str);
129 len = strlen(str_p);
130 }
131
132 bool should_quote =
133 options & quote_all ||
134 (options & quote_needed && needs_quote(str_p, delim, na_str));
135 if (should_quote) {
136 buf.push_back('"');
137 }
138
139 auto end = str_p + len;
140 bool should_escape = options & (escape_double | escape_backslash);
141 auto escape =
142 options & escape_double ? '"' : options & escape_backslash ? '\\' : '\0';
143
144 buf.reserve(buf.size() + len);
145 while (str_p < end) {
146 if (should_escape && *str_p == '"') {
147 buf.push_back(escape);
148 }
149 buf.push_back(*str_p++);
150 }
151
152 if (should_quote) {
153 buf.push_back('"');
154 }
155 return;
156 }
157
fill_buf(const cpp11::list & input,const char delim,const std::string & eol,const char * na_str,size_t options,const std::vector<SEXPTYPE> & types,const std::vector<void * > & ptrs,size_t begin,size_t end)158 std::vector<char> fill_buf(
159 const cpp11::list& input,
160 const char delim,
161 const std::string& eol,
162 const char* na_str,
163 size_t options,
164 const std::vector<SEXPTYPE>& types,
165 const std::vector<void*>& ptrs,
166 size_t begin,
167 size_t end) {
168
169 auto buf = std::vector<char>();
170
171 auto na_len = strlen(na_str);
172
173 for (size_t row = begin; row < end; ++row) {
174 for (int col = 0; col < input.size(); ++col) {
175 switch (types[col]) {
176 case STRSXP: {
177 auto str = STRING_ELT(input[col], row);
178 str_to_buf(str, buf, delim, na_str, na_len, options);
179 break;
180 }
181 case LGLSXP: {
182 int value = static_cast<int*>(ptrs[col])[row];
183 switch (value) {
184 case TRUE:
185 append_literal(buf, "TRUE");
186 break;
187 case FALSE:
188 append_literal(buf, "FALSE");
189 break;
190 default:
191 std::copy(na_str, na_str + na_len, std::back_inserter(buf));
192 break;
193 }
194 break;
195 }
196 case REALSXP: {
197 auto value = static_cast<double*>(ptrs[col])[row];
198 if (!R_FINITE(value)) {
199 if (ISNA(value)) {
200 std::copy(na_str, na_str + na_len, std::back_inserter(buf));
201 } else if (ISNAN(value)) {
202 std::copy(na_str, na_str + na_len, std::back_inserter(buf));
203 } else if (value > 0) {
204 append_literal(buf, "Inf");
205 } else {
206 append_literal(buf, "-Inf");
207 }
208 } else {
209 char temp_buf[33];
210 int len = dtoa_grisu3(static_cast<double*>(ptrs[col])[row], temp_buf);
211 std::copy(temp_buf, temp_buf + len, std::back_inserter(buf));
212 }
213 break;
214 }
215 case INTSXP: {
216 auto value = static_cast<int*>(ptrs[col])[row];
217 if (value == NA_INTEGER) {
218 std::copy(na_str, na_str + na_len, std::back_inserter(buf));
219 } else {
220 // TODO: use something like https://github.com/jeaiii/itoa for
221 // faster integer writing
222 char temp_buf[12];
223 auto len = sprintf(temp_buf, "%i", value);
224 std::copy(temp_buf, temp_buf + len, std::back_inserter(buf));
225 }
226 break;
227 }
228 }
229 if (delim != '\0') {
230 buf.push_back(delim);
231 }
232 }
233 if (delim != '\0') {
234 buf.pop_back();
235 }
236 for (auto c : eol) {
237 buf.push_back(c);
238 }
239 }
240
241 return buf;
242 }
243
write_buf(const std::vector<char> & buf,T & out)244 template <typename T> void write_buf(const std::vector<char>& buf, T& out) {}
245
write_buf(const std::vector<char> & buf,std::FILE * & out)246 template <> void write_buf(const std::vector<char>& buf, std::FILE*& out) {
247 std::fwrite(buf.data(), sizeof buf[0], buf.size(), out);
248 }
249
250 template <>
write_buf(const std::vector<char> & buf,std::vector<char> & data)251 void write_buf(const std::vector<char>& buf, std::vector<char>& data) {
252 std::copy(buf.begin(), buf.end(), std::back_inserter(data));
253 }
254
write_buf(const std::vector<char> & buf,SEXP & con)255 template <> void write_buf(const std::vector<char>& buf, SEXP& con) {
256 R_WriteConnection(con, (void*)buf.data(), sizeof buf[0] * buf.size());
257 }
258
259 #ifdef VROOM_USE_CONNECTIONS_API
write_buf_con(const std::vector<char> & buf,Rconnection con,bool is_stdout)260 void write_buf_con(
261 const std::vector<char>& buf, Rconnection con, bool is_stdout) {
262 if (is_stdout) {
263 std::string out;
264 std::copy(buf.begin(), buf.end(), std::back_inserter(out));
265 Rprintf("%.*s", buf.size(), out.c_str());
266 } else {
267 R_WriteConnection(con, (void*)buf.data(), sizeof buf[0] * buf.size());
268 }
269 }
270 #else
write_buf_con(const std::vector<char> & buf,SEXP con,bool is_stdout)271 void write_buf_con(const std::vector<char>& buf, SEXP con, bool is_stdout) {
272 if (is_stdout) {
273 std::string out;
274 std::copy(buf.begin(), buf.end(), std::back_inserter(out));
275 Rprintf("%.*s", buf.size(), out.c_str());
276 } else {
277 write_buf(buf, con);
278 }
279 }
280 #endif
281
get_types(const cpp11::list & input)282 std::vector<SEXPTYPE> get_types(const cpp11::list& input) {
283 std::vector<SEXPTYPE> out;
284 for (auto col : input) {
285 out.push_back(TYPEOF(col));
286 }
287 return out;
288 }
289
get_ptrs(const cpp11::list & input)290 std::vector<void*> get_ptrs(const cpp11::list& input) {
291 std::vector<void*> out;
292 for (auto col : input) {
293 switch (TYPEOF(col)) {
294 case REALSXP:
295 out.push_back(REAL(col));
296 break;
297 case INTSXP:
298 out.push_back(INTEGER(col));
299 break;
300 case LGLSXP:
301 out.push_back(LOGICAL(col));
302 break;
303 default:
304 out.push_back(nullptr);
305 }
306 }
307 return out;
308 }
309
get_header(const cpp11::list & input,const char delim,const std::string & eol,size_t options)310 std::vector<char> get_header(
311 const cpp11::list& input,
312 const char delim,
313 const std::string& eol,
314 size_t options) {
315 cpp11::strings names(input.attr("names"));
316 std::vector<char> out;
317 for (R_xlen_t i = 0; i < names.size(); ++i) {
318 auto str = STRING_ELT(names, i);
319
320 str_to_buf(str, out, delim, "", 0, options);
321 if (delim != '\0') {
322 out.push_back(delim);
323 }
324 }
325 if (delim != '\0') {
326 out.pop_back();
327 }
328 for (auto c : eol) {
329 out.push_back(c);
330 }
331 return out;
332 }
333
334 template <typename T>
vroom_write_out(const cpp11::list & input,T & out,const char delim,const std::string & eol,const char * na_str,bool col_names,bool append,size_t options,size_t num_threads,bool progress,size_t buf_lines)335 void vroom_write_out(
336 const cpp11::list& input,
337 T& out,
338 const char delim,
339 const std::string& eol,
340 const char* na_str,
341 bool col_names,
342 bool append,
343 size_t options,
344 size_t num_threads,
345 bool progress,
346 size_t buf_lines) {
347
348 size_t begin = 0;
349 size_t num_rows = Rf_xlength(input[0]);
350
351 std::array<std::vector<std::future<std::vector<char>>>, 2> futures;
352 futures[0].resize(num_threads);
353 futures[1].resize(num_threads);
354
355 std::future<size_t> write_fut;
356
357 int idx = 0;
358
359 auto types = get_types(input);
360 auto ptrs = get_ptrs(input);
361
362 if (!append && options & bom) {
363 std::vector<char> bom{'\xEF', '\xBB', '\xBF'};
364 write_buf(bom, out);
365 }
366
367 if (col_names) {
368 auto header = get_header(input, delim, eol, options);
369 write_buf(header, out);
370 }
371
372 std::unique_ptr<RProgress::RProgress> pb = nullptr;
373 if (progress) {
374 pb = std::unique_ptr<RProgress::RProgress>(
375 new RProgress::RProgress(vroom::get_pb_format("write"), 1e12));
376 }
377
378 while (begin < num_rows) {
379 size_t t = 0;
380 while (t < num_threads && begin < num_rows) {
381 auto num_lines = std::min(buf_lines, num_rows - begin);
382 auto end = begin + num_lines;
383 futures[idx][t++] = std::async(
384 fill_buf,
385 input,
386 delim,
387 eol,
388 na_str,
389 options,
390 types,
391 ptrs,
392 begin,
393 end);
394 begin += num_lines;
395 }
396
397 if (write_fut.valid()) {
398 auto sz = write_fut.get();
399 if (progress) {
400 pb->tick(sz);
401 }
402 }
403
404 write_fut = std::async([&, idx, t] {
405 size_t sz = 0;
406 for (size_t i = 0; i < t; ++i) {
407 auto buf = futures[idx][i].get();
408 write_buf(buf, out);
409 sz += buf.size();
410 }
411 return sz;
412 });
413
414 idx = (idx + 1) % 2;
415 }
416
417 // Wait for the last writing to finish
418 if (write_fut.valid()) {
419 write_fut.get();
420 if (progress) {
421 pb->update(1);
422 }
423 }
424 }
425
vroom_write_(const cpp11::list & input,const std::string & filename,const char delim,const std::string & eol,const char * na_str,bool col_names,bool append,size_t options,size_t num_threads,bool progress,size_t buf_lines)426 [[cpp11::register]] void vroom_write_(
427 const cpp11::list& input,
428 const std::string& filename,
429 const char delim,
430 const std::string& eol,
431 const char* na_str,
432 bool col_names,
433 bool append,
434 size_t options,
435 size_t num_threads,
436 bool progress,
437 size_t buf_lines) {
438
439 char mode[3] = "wb";
440 if (append) {
441 strcpy(mode, "ab");
442 }
443
444 std::FILE* out = unicode_fopen(filename.c_str(), mode);
445 if (!out) {
446 std::string msg("Cannot open file for writing:\n* ");
447 msg += '\'' + filename + '\'';
448 cpp11::stop(msg.c_str());
449 }
450
451 vroom_write_out(
452 input,
453 out,
454 delim,
455 eol,
456 na_str,
457 col_names,
458 append,
459 options,
460 num_threads,
461 progress,
462 buf_lines);
463
464 // Close the file
465 std::fclose(out);
466 }
467
468 // TODO: Think about refactoring this so it and vroom_write_ can share some
469 // code
vroom_write_connection_(const cpp11::list & input,const cpp11::sexp & con,const char delim,const std::string & eol,const char * na_str,bool col_names,size_t options,size_t num_threads,bool progress,size_t buf_lines,bool is_stdout,bool append)470 [[cpp11::register]] void vroom_write_connection_(
471 const cpp11::list& input,
472 const cpp11::sexp& con,
473 const char delim,
474 const std::string& eol,
475 const char* na_str,
476 bool col_names,
477 size_t options,
478 size_t num_threads,
479 bool progress,
480 size_t buf_lines,
481 bool is_stdout,
482 bool append) {
483
484 char mode[3] = "wb";
485 if (append) {
486 strcpy(mode, "ab");
487 }
488
489 size_t begin = 0;
490 size_t num_rows = Rf_xlength(input[0]);
491
492 auto con_ = R_GetConnection(con);
493
494 bool should_open = !is_open(con);
495 if (should_open) {
496 cpp11::package("base")["open"](con, mode);
497 }
498
499 bool should_close = should_open;
500
501 std::array<std::vector<std::future<std::vector<char>>>, 2> futures;
502 futures[0].resize(num_threads);
503 futures[1].resize(num_threads);
504
505 std::future<size_t> write_fut;
506
507 int idx = 0;
508
509 auto types = get_types(input);
510 auto ptrs = get_ptrs(input);
511
512 if (col_names) {
513 auto header = get_header(input, delim, eol, options);
514 write_buf_con(header, con_, is_stdout);
515 }
516
517 std::unique_ptr<RProgress::RProgress> pb = nullptr;
518 if (progress) {
519 pb = std::unique_ptr<RProgress::RProgress>(
520 new RProgress::RProgress(vroom::get_pb_format("write"), 1e12));
521 }
522
523 while (begin < num_rows) {
524 size_t t = 0;
525 while (t < num_threads && begin < num_rows) {
526 auto num_lines = std::min(buf_lines, num_rows - begin);
527 auto end = begin + num_lines;
528 futures[idx][t++] = std::async(
529 fill_buf,
530 input,
531 delim,
532 eol,
533 na_str,
534 options,
535 types,
536 ptrs,
537 begin,
538 end);
539 begin += num_lines;
540 }
541
542 for (size_t i = 0; i < t; ++i) {
543 auto buf = futures[idx][i].get();
544 write_buf_con(buf, con_, is_stdout);
545 auto sz = buf.size();
546 if (progress) {
547 pb->tick(sz);
548 }
549 }
550
551 idx = (idx + 1) % 2;
552 }
553
554 if (progress) {
555 pb->update(1);
556 }
557
558 // Close the connection
559 if (should_close) {
560 cpp11::package("base")["close"](con);
561 }
562 }
563
vroom_format_(const cpp11::list & input,const char delim,const std::string & eol,const char * na_str,bool col_names,bool append,size_t options,size_t num_threads,bool progress,size_t buf_lines)564 [[cpp11::register]] cpp11::strings vroom_format_(
565 const cpp11::list& input,
566 const char delim,
567 const std::string& eol,
568 const char* na_str,
569 bool col_names,
570 bool append,
571 size_t options,
572 size_t num_threads,
573 bool progress,
574 size_t buf_lines) {
575
576 std::vector<char> data;
577
578 vroom_write_out(
579 input,
580 data,
581 delim,
582 eol,
583 na_str,
584 col_names,
585 append,
586 options,
587 num_threads,
588 progress,
589 buf_lines);
590
591 cpp11::writable::strings out(1);
592
593 out[0] = Rf_mkCharLenCE(data.data(), data.size(), CE_UTF8);
594
595 return out;
596 }
597