1 #include "grisu3.h"
2 #include <array>
3 #include <future>
4 #include <iterator>
5 
6 #include <cpp11/R.hpp>
7 #include <cpp11/function.hpp>
8 #include <cpp11/list.hpp>
9 #include <cpp11/strings.hpp>
10 
11 #include "RProgress.h"
12 #include "connection.h"
13 #include "r_utils.h"
14 
15 #include "unicode_fopen.h"
16 
17 typedef enum {
18   quote_needed = 1,
19   quote_all = 2,
20   escape_double = 4,
21   escape_backslash = 8,
22   bom = 16
23 } vroom_write_opt_t;
24 
get_buffer_size(const cpp11::list & input,const std::vector<SEXPTYPE> & types,size_t start,size_t end)25 size_t get_buffer_size(
26     const cpp11::list& input,
27     const std::vector<SEXPTYPE>& types,
28     size_t start,
29     size_t end) {
30 
31   // First need to determine how big the buffer(s) should be
32   // - For characters we need the total nchar() + 2 (for quotes if needed)
33   //   (they are converted to UTF-8 in R)
34   // - For factors we need max(nchar(levels)) (but currently we just convert to
35   //   character in R)
36   // - For decimal numbers we need 24
37   //   source: https://stackoverflow.com/a/52045523/2055486
38   // - For 32 bit integers we need 11 (10 for digits plus the sign)
39   // - For logical we need 5 (FALSE)
40   //
41   // - Currently we convert dates, times and datetimes to character before
42   //   output. If we wanted to do it in C it would be
43   //   - For dates we need 10 (2019-04-12)
44   //   - For times we need 8 (01:00:00)
45   //   - For datetimes we need 20 (2019-04-12T20:46:31Z)
46 
47   size_t buf_size = 0;
48 
49   size_t num_rows = end - start;
50 
51   for (int i = 0; i < input.size(); ++i) {
52     switch (types[i]) {
53     case STRSXP: {
54       for (size_t j = start; j < end; ++j) {
55         auto sz = Rf_xlength(STRING_ELT(input[i], j));
56         buf_size += sz + 2;
57       }
58       break;
59     }
60     case LGLSXP:
61       buf_size += 5 * num_rows;
62       break;
63     case REALSXP:
64       buf_size += 24 * num_rows;
65       break;
66     case INTSXP:
67       buf_size += 11 * num_rows;
68       break;
69     }
70   }
71 
72   // Add size of delimiters + newline
73   buf_size += input.size() * num_rows;
74 
75   return buf_size;
76 }
77 
needs_quote(const char * str,const char delim,const char * na_str)78 bool needs_quote(const char* str, const char delim, const char* na_str) {
79   if (strncmp(str, na_str, 2) == 0) {
80     return true;
81   }
82 
83   for (const char* cur = str; *cur != '\0'; ++cur) {
84     if (*cur == '\n' || *cur == '\r' || *cur == '"' || *cur == delim) {
85       return true;
86     }
87   }
88 
89   return false;
90 }
91 
92 // adapted from https://stackoverflow.com/a/28110728/2055486
93 template <size_t N>
append_literal(std::vector<char> & buf,const char (& str)[N])94 void append_literal(std::vector<char>& buf, const char (&str)[N]) {
95   std::copy(std::begin(str), std::end(str) - 1, std::back_inserter(buf));
96 }
97 
is_utf8(cetype_t ce)98 inline bool is_utf8(cetype_t ce) {
99   switch (ce) {
100   case CE_ANY:
101   case CE_BYTES:
102   case CE_UTF8:
103     return true;
104   default:
105     return false;
106   }
107 }
108 
str_to_buf(SEXP str,std::vector<char> & buf,const char delim,const char * na_str,size_t na_len,size_t options)109 void str_to_buf(
110     SEXP str,
111     std::vector<char>& buf,
112     const char delim,
113     const char* na_str,
114     size_t na_len,
115     size_t options) {
116 
117   if (str == NA_STRING) {
118     std::copy(na_str, na_str + na_len, std::back_inserter(buf));
119     return;
120   }
121 
122   const char* str_p;
123   size_t len;
124   if (is_utf8(Rf_getCharCE(str))) {
125     str_p = CHAR(str);
126     len = Rf_xlength(str);
127   } else {
128     str_p = Rf_translateCharUTF8(str);
129     len = strlen(str_p);
130   }
131 
132   bool should_quote =
133       options & quote_all ||
134       (options & quote_needed && needs_quote(str_p, delim, na_str));
135   if (should_quote) {
136     buf.push_back('"');
137   }
138 
139   auto end = str_p + len;
140   bool should_escape = options & (escape_double | escape_backslash);
141   auto escape =
142       options & escape_double ? '"' : options & escape_backslash ? '\\' : '\0';
143 
144   buf.reserve(buf.size() + len);
145   while (str_p < end) {
146     if (should_escape && *str_p == '"') {
147       buf.push_back(escape);
148     }
149     buf.push_back(*str_p++);
150   }
151 
152   if (should_quote) {
153     buf.push_back('"');
154   }
155   return;
156 }
157 
fill_buf(const cpp11::list & input,const char delim,const std::string & eol,const char * na_str,size_t options,const std::vector<SEXPTYPE> & types,const std::vector<void * > & ptrs,size_t begin,size_t end)158 std::vector<char> fill_buf(
159     const cpp11::list& input,
160     const char delim,
161     const std::string& eol,
162     const char* na_str,
163     size_t options,
164     const std::vector<SEXPTYPE>& types,
165     const std::vector<void*>& ptrs,
166     size_t begin,
167     size_t end) {
168 
169   auto buf = std::vector<char>();
170 
171   auto na_len = strlen(na_str);
172 
173   for (size_t row = begin; row < end; ++row) {
174     for (int col = 0; col < input.size(); ++col) {
175       switch (types[col]) {
176       case STRSXP: {
177         auto str = STRING_ELT(input[col], row);
178         str_to_buf(str, buf, delim, na_str, na_len, options);
179         break;
180       }
181       case LGLSXP: {
182         int value = static_cast<int*>(ptrs[col])[row];
183         switch (value) {
184         case TRUE:
185           append_literal(buf, "TRUE");
186           break;
187         case FALSE:
188           append_literal(buf, "FALSE");
189           break;
190         default:
191           std::copy(na_str, na_str + na_len, std::back_inserter(buf));
192           break;
193         }
194         break;
195       }
196       case REALSXP: {
197         auto value = static_cast<double*>(ptrs[col])[row];
198         if (!R_FINITE(value)) {
199           if (ISNA(value)) {
200             std::copy(na_str, na_str + na_len, std::back_inserter(buf));
201           } else if (ISNAN(value)) {
202             std::copy(na_str, na_str + na_len, std::back_inserter(buf));
203           } else if (value > 0) {
204             append_literal(buf, "Inf");
205           } else {
206             append_literal(buf, "-Inf");
207           }
208         } else {
209           char temp_buf[33];
210           int len = dtoa_grisu3(static_cast<double*>(ptrs[col])[row], temp_buf);
211           std::copy(temp_buf, temp_buf + len, std::back_inserter(buf));
212         }
213         break;
214       }
215       case INTSXP: {
216         auto value = static_cast<int*>(ptrs[col])[row];
217         if (value == NA_INTEGER) {
218           std::copy(na_str, na_str + na_len, std::back_inserter(buf));
219         } else {
220           // TODO: use something like https://github.com/jeaiii/itoa for
221           // faster integer writing
222           char temp_buf[12];
223           auto len = sprintf(temp_buf, "%i", value);
224           std::copy(temp_buf, temp_buf + len, std::back_inserter(buf));
225         }
226         break;
227       }
228       }
229       if (delim != '\0') {
230         buf.push_back(delim);
231       }
232     }
233     if (delim != '\0') {
234       buf.pop_back();
235     }
236     for (auto c : eol) {
237       buf.push_back(c);
238     }
239   }
240 
241   return buf;
242 }
243 
write_buf(const std::vector<char> & buf,T & out)244 template <typename T> void write_buf(const std::vector<char>& buf, T& out) {}
245 
write_buf(const std::vector<char> & buf,std::FILE * & out)246 template <> void write_buf(const std::vector<char>& buf, std::FILE*& out) {
247   std::fwrite(buf.data(), sizeof buf[0], buf.size(), out);
248 }
249 
250 template <>
write_buf(const std::vector<char> & buf,std::vector<char> & data)251 void write_buf(const std::vector<char>& buf, std::vector<char>& data) {
252   std::copy(buf.begin(), buf.end(), std::back_inserter(data));
253 }
254 
write_buf(const std::vector<char> & buf,SEXP & con)255 template <> void write_buf(const std::vector<char>& buf, SEXP& con) {
256   R_WriteConnection(con, (void*)buf.data(), sizeof buf[0] * buf.size());
257 }
258 
259 #ifdef VROOM_USE_CONNECTIONS_API
write_buf_con(const std::vector<char> & buf,Rconnection con,bool is_stdout)260 void write_buf_con(
261     const std::vector<char>& buf, Rconnection con, bool is_stdout) {
262   if (is_stdout) {
263     std::string out;
264     std::copy(buf.begin(), buf.end(), std::back_inserter(out));
265     Rprintf("%.*s", buf.size(), out.c_str());
266   } else {
267     R_WriteConnection(con, (void*)buf.data(), sizeof buf[0] * buf.size());
268   }
269 }
270 #else
write_buf_con(const std::vector<char> & buf,SEXP con,bool is_stdout)271 void write_buf_con(const std::vector<char>& buf, SEXP con, bool is_stdout) {
272   if (is_stdout) {
273     std::string out;
274     std::copy(buf.begin(), buf.end(), std::back_inserter(out));
275     Rprintf("%.*s", buf.size(), out.c_str());
276   } else {
277     write_buf(buf, con);
278   }
279 }
280 #endif
281 
get_types(const cpp11::list & input)282 std::vector<SEXPTYPE> get_types(const cpp11::list& input) {
283   std::vector<SEXPTYPE> out;
284   for (auto col : input) {
285     out.push_back(TYPEOF(col));
286   }
287   return out;
288 }
289 
get_ptrs(const cpp11::list & input)290 std::vector<void*> get_ptrs(const cpp11::list& input) {
291   std::vector<void*> out;
292   for (auto col : input) {
293     switch (TYPEOF(col)) {
294     case REALSXP:
295       out.push_back(REAL(col));
296       break;
297     case INTSXP:
298       out.push_back(INTEGER(col));
299       break;
300     case LGLSXP:
301       out.push_back(LOGICAL(col));
302       break;
303     default:
304       out.push_back(nullptr);
305     }
306   }
307   return out;
308 }
309 
get_header(const cpp11::list & input,const char delim,const std::string & eol,size_t options)310 std::vector<char> get_header(
311     const cpp11::list& input,
312     const char delim,
313     const std::string& eol,
314     size_t options) {
315   cpp11::strings names(input.attr("names"));
316   std::vector<char> out;
317   for (R_xlen_t i = 0; i < names.size(); ++i) {
318     auto str = STRING_ELT(names, i);
319 
320     str_to_buf(str, out, delim, "", 0, options);
321     if (delim != '\0') {
322       out.push_back(delim);
323     }
324   }
325   if (delim != '\0') {
326     out.pop_back();
327   }
328   for (auto c : eol) {
329     out.push_back(c);
330   }
331   return out;
332 }
333 
334 template <typename T>
vroom_write_out(const cpp11::list & input,T & out,const char delim,const std::string & eol,const char * na_str,bool col_names,bool append,size_t options,size_t num_threads,bool progress,size_t buf_lines)335 void vroom_write_out(
336     const cpp11::list& input,
337     T& out,
338     const char delim,
339     const std::string& eol,
340     const char* na_str,
341     bool col_names,
342     bool append,
343     size_t options,
344     size_t num_threads,
345     bool progress,
346     size_t buf_lines) {
347 
348   size_t begin = 0;
349   size_t num_rows = Rf_xlength(input[0]);
350 
351   std::array<std::vector<std::future<std::vector<char>>>, 2> futures;
352   futures[0].resize(num_threads);
353   futures[1].resize(num_threads);
354 
355   std::future<size_t> write_fut;
356 
357   int idx = 0;
358 
359   auto types = get_types(input);
360   auto ptrs = get_ptrs(input);
361 
362   if (!append && options & bom) {
363     std::vector<char> bom{'\xEF', '\xBB', '\xBF'};
364     write_buf(bom, out);
365   }
366 
367   if (col_names) {
368     auto header = get_header(input, delim, eol, options);
369     write_buf(header, out);
370   }
371 
372   std::unique_ptr<RProgress::RProgress> pb = nullptr;
373   if (progress) {
374     pb = std::unique_ptr<RProgress::RProgress>(
375         new RProgress::RProgress(vroom::get_pb_format("write"), 1e12));
376   }
377 
378   while (begin < num_rows) {
379     size_t t = 0;
380     while (t < num_threads && begin < num_rows) {
381       auto num_lines = std::min(buf_lines, num_rows - begin);
382       auto end = begin + num_lines;
383       futures[idx][t++] = std::async(
384           fill_buf,
385           input,
386           delim,
387           eol,
388           na_str,
389           options,
390           types,
391           ptrs,
392           begin,
393           end);
394       begin += num_lines;
395     }
396 
397     if (write_fut.valid()) {
398       auto sz = write_fut.get();
399       if (progress) {
400         pb->tick(sz);
401       }
402     }
403 
404     write_fut = std::async([&, idx, t] {
405       size_t sz = 0;
406       for (size_t i = 0; i < t; ++i) {
407         auto buf = futures[idx][i].get();
408         write_buf(buf, out);
409         sz += buf.size();
410       }
411       return sz;
412     });
413 
414     idx = (idx + 1) % 2;
415   }
416 
417   // Wait for the last writing to finish
418   if (write_fut.valid()) {
419     write_fut.get();
420     if (progress) {
421       pb->update(1);
422     }
423   }
424 }
425 
vroom_write_(const cpp11::list & input,const std::string & filename,const char delim,const std::string & eol,const char * na_str,bool col_names,bool append,size_t options,size_t num_threads,bool progress,size_t buf_lines)426 [[cpp11::register]] void vroom_write_(
427     const cpp11::list& input,
428     const std::string& filename,
429     const char delim,
430     const std::string& eol,
431     const char* na_str,
432     bool col_names,
433     bool append,
434     size_t options,
435     size_t num_threads,
436     bool progress,
437     size_t buf_lines) {
438 
439   char mode[3] = "wb";
440   if (append) {
441     strcpy(mode, "ab");
442   }
443 
444   std::FILE* out = unicode_fopen(filename.c_str(), mode);
445   if (!out) {
446     std::string msg("Cannot open file for writing:\n* ");
447     msg += '\'' + filename + '\'';
448     cpp11::stop(msg.c_str());
449   }
450 
451   vroom_write_out(
452       input,
453       out,
454       delim,
455       eol,
456       na_str,
457       col_names,
458       append,
459       options,
460       num_threads,
461       progress,
462       buf_lines);
463 
464   // Close the file
465   std::fclose(out);
466 }
467 
468 // TODO: Think about refactoring this so it and vroom_write_ can share some
469 // code
vroom_write_connection_(const cpp11::list & input,const cpp11::sexp & con,const char delim,const std::string & eol,const char * na_str,bool col_names,size_t options,size_t num_threads,bool progress,size_t buf_lines,bool is_stdout,bool append)470 [[cpp11::register]] void vroom_write_connection_(
471     const cpp11::list& input,
472     const cpp11::sexp& con,
473     const char delim,
474     const std::string& eol,
475     const char* na_str,
476     bool col_names,
477     size_t options,
478     size_t num_threads,
479     bool progress,
480     size_t buf_lines,
481     bool is_stdout,
482     bool append) {
483 
484   char mode[3] = "wb";
485   if (append) {
486     strcpy(mode, "ab");
487   }
488 
489   size_t begin = 0;
490   size_t num_rows = Rf_xlength(input[0]);
491 
492   auto con_ = R_GetConnection(con);
493 
494   bool should_open = !is_open(con);
495   if (should_open) {
496     cpp11::package("base")["open"](con, mode);
497   }
498 
499   bool should_close = should_open;
500 
501   std::array<std::vector<std::future<std::vector<char>>>, 2> futures;
502   futures[0].resize(num_threads);
503   futures[1].resize(num_threads);
504 
505   std::future<size_t> write_fut;
506 
507   int idx = 0;
508 
509   auto types = get_types(input);
510   auto ptrs = get_ptrs(input);
511 
512   if (col_names) {
513     auto header = get_header(input, delim, eol, options);
514     write_buf_con(header, con_, is_stdout);
515   }
516 
517   std::unique_ptr<RProgress::RProgress> pb = nullptr;
518   if (progress) {
519     pb = std::unique_ptr<RProgress::RProgress>(
520         new RProgress::RProgress(vroom::get_pb_format("write"), 1e12));
521   }
522 
523   while (begin < num_rows) {
524     size_t t = 0;
525     while (t < num_threads && begin < num_rows) {
526       auto num_lines = std::min(buf_lines, num_rows - begin);
527       auto end = begin + num_lines;
528       futures[idx][t++] = std::async(
529           fill_buf,
530           input,
531           delim,
532           eol,
533           na_str,
534           options,
535           types,
536           ptrs,
537           begin,
538           end);
539       begin += num_lines;
540     }
541 
542     for (size_t i = 0; i < t; ++i) {
543       auto buf = futures[idx][i].get();
544       write_buf_con(buf, con_, is_stdout);
545       auto sz = buf.size();
546       if (progress) {
547         pb->tick(sz);
548       }
549     }
550 
551     idx = (idx + 1) % 2;
552   }
553 
554   if (progress) {
555     pb->update(1);
556   }
557 
558   // Close the connection
559   if (should_close) {
560     cpp11::package("base")["close"](con);
561   }
562 }
563 
vroom_format_(const cpp11::list & input,const char delim,const std::string & eol,const char * na_str,bool col_names,bool append,size_t options,size_t num_threads,bool progress,size_t buf_lines)564 [[cpp11::register]] cpp11::strings vroom_format_(
565     const cpp11::list& input,
566     const char delim,
567     const std::string& eol,
568     const char* na_str,
569     bool col_names,
570     bool append,
571     size_t options,
572     size_t num_threads,
573     bool progress,
574     size_t buf_lines) {
575 
576   std::vector<char> data;
577 
578   vroom_write_out(
579       input,
580       data,
581       delim,
582       eol,
583       na_str,
584       col_names,
585       append,
586       options,
587       num_threads,
588       progress,
589       buf_lines);
590 
591   cpp11::writable::strings out(1);
592 
593   out[0] = Rf_mkCharLenCE(data.data(), data.size(), CE_UTF8);
594 
595   return out;
596 }
597