1 #include "dt_stdio.h"
2 #include <errno.h>
3 #include <unistd.h> // for access()
4 #include <fcntl.h>
5 #include <stdbool.h> // true and false
6 #include <stdint.h> // INT32_MIN
7 #include <math.h> // isfinite, isnan
8 #include <stdlib.h> // abs
9 #include <string.h> // strlen, strerror
10 #ifndef NOZLIB
11 #include <zlib.h> // for compression to .gz
12 #endif
13
14 #ifdef WIN32
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <io.h>
18 #define WRITE _write
19 #define CLOSE _close
20 #else
21 #define WRITE write
22 #define CLOSE close
23 #endif
24
25 #include "myomp.h"
26 #include "fwriteLookups.h"
27 #include "fwrite.h"
28
29 #define NUM_SF 15
30 #define SIZE_SF 1000000000000000ULL // 10^NUM_SF
31
32 #define MIN(a,b) (((a)<(b))?(a):(b))
33 #define MAX(a,b) (((a)>(b))?(a):(b))
34
35 // Globals for this file only. Written once to hold parameters passed from R level.
36 static const char *na; // by default "" or if set (not recommended) then usually "NA"
37 static char sep; // comma in .csv files
38 static char sep2; // '|' within list columns. Used here to know if field should be quoted and in freadR.c to write sep2 in list columns
39 static char dec; // the '.' in the number 3.1416. In Europe often: 3,1416
40 static int8_t doQuote=INT8_MIN; // whether to surround fields with double quote ". NA means 'auto' (default)
41 static bool qmethodEscape=false; // when quoting fields, how to escape double quotes in the field contents (default false means to add another double quote)
42 static int scipen;
43 static bool squashDateTime=false; // 0=ISO(yyyy-mm-dd) 1=squash(yyyymmdd)
44 static bool verbose=false;
45
46 extern const char *getString(const void *, int64_t);
47 extern int getStringLen(const void *, int64_t);
48 extern int getMaxStringLen(const void *, int64_t);
49 extern int getMaxCategLen(const void *);
50 extern int getMaxListItemLen(const void *, int64_t);
51 extern const char *getCategString(const void *, int64_t);
52 extern double wallclock(void);
53
write_chars(const char * x,char ** pch)54 inline void write_chars(const char *x, char **pch)
55 {
56 // similar to C's strcpy but i) doesn't include trailing \0 and ii) moves destination along
57 char *ch = *pch;
58 while (*x) *ch++=*x++;
59 *pch = ch;
60 }
61
writeBool8(int8_t * col,int64_t row,char ** pch)62 void writeBool8(int8_t *col, int64_t row, char **pch)
63 {
64 int8_t x = col[row];
65 char *ch = *pch;
66 *ch++ = '0'+(x==1);
67 *pch = ch-(x==INT8_MIN); // if NA then step back, to save a branch
68 }
69
writeBool32(int32_t * col,int64_t row,char ** pch)70 void writeBool32(int32_t *col, int64_t row, char **pch)
71 {
72 int32_t x = col[row];
73 char *ch = *pch;
74 if (x==INT32_MIN) { // TODO: when na=='\0' as recommended, use a branchless writer
75 write_chars(na, &ch);
76 } else {
77 *ch++ = '0'+x;
78 }
79 *pch = ch;
80 }
81
writeBool32AsString(int32_t * col,int64_t row,char ** pch)82 void writeBool32AsString(int32_t *col, int64_t row, char **pch)
83 {
84 int32_t x = col[row];
85 char *ch = *pch;
86 if (x == INT32_MIN) {
87 write_chars(na, &ch);
88 } else if (x) {
89 *ch++='T'; *ch++='R'; *ch++='U'; *ch++='E';
90 } else {
91 *ch++='F'; *ch++='A'; *ch++='L'; *ch++='S'; *ch++='E';
92 }
93 *pch = ch;
94 }
95
reverse(char * upp,char * low)96 static inline void reverse(char *upp, char *low)
97 {
98 upp--;
99 while (upp>low) {
100 char tmp = *upp;
101 *upp = *low;
102 *low = tmp;
103 upp--;
104 low++;
105 }
106 }
107
writeInt32(int32_t * col,int64_t row,char ** pch)108 void writeInt32(int32_t *col, int64_t row, char **pch)
109 {
110 char *ch = *pch;
111 int32_t x = col[row];
112 if (x == INT32_MIN) {
113 write_chars(na, &ch);
114 } else {
115 if (x<0) { *ch++ = '-'; x=-x; }
116 // Avoid log() for speed. Write backwards then reverse when we know how long.
117 char *low = ch;
118 do { *ch++ = '0'+x%10; x/=10; } while (x>0);
119 reverse(ch, low);
120 }
121 *pch = ch;
122 }
123
writeInt64(int64_t * col,int64_t row,char ** pch)124 void writeInt64(int64_t *col, int64_t row, char **pch)
125 {
126 char *ch = *pch;
127 int64_t x = col[row];
128 if (x == INT64_MIN) {
129 write_chars(na, &ch);
130 } else {
131 if (x<0) { *ch++ = '-'; x=-x; }
132 char *low = ch;
133 do { *ch++ = '0'+x%10; x/=10; } while (x>0);
134 reverse(ch, low);
135 }
136 *pch = ch;
137 }
138
139 /*
140 * Generate fwriteLookup.h which defines sigparts, expsig and exppow that writeNumeric() that follows uses.
141 * It was run once a long time ago in dev and we don't need to generate it again unless we change it.
142 * Commented out and left here in the file where its result is used, in case we need it in future.
143 * Reason: ldexpl may not be available on all platforms and is slower than a direct lookup when it is.
144 *
145 void genLookups() {
146 FILE *f = fopen("/tmp/fwriteLookups.h", "w");
147 fprintf(f, "//\n\
148 // Generated by fwrite.c:genLookups()\n\
149 //\n\
150 // 3 vectors: sigparts, expsig and exppow\n\
151 // Includes precision higher than double; leave this compiler on this machine\n\
152 // to parse the literals at reduced precision.\n\
153 // 2^(-1023:1024) is held more accurately than double provides by storing its\n\
154 // exponent separately (expsig and exppow)\n\
155 // We don't want to depend on 'long double' (>64bit) availability to generate\n\
156 // these at runtime; libraries and hardware vary.\n\
157 // These small lookup tables are used for speed.\n\
158 //\n\n");
159 fprintf(f, "const double sigparts[53] = {\n0.0,\n");
160 for (int i=1; i<=52; i++) {
161 fprintf(f, "%.40Le%s\n",ldexpl(1.0L,-i), i==52?"":",");
162 }
163 fprintf(f, "};\n\nconst double expsig[2048] = {\n");
164 char x[2048][60];
165 for (int i=0; i<2048; i++) {
166 sprintf(x[i], "%.40Le", ldexpl(1.0L, i-1023));
167 fprintf(f, "%.*s%s\n", (int)(strchr(x[i],'e')-x[i]), x[i], (i==2047?"":",") );
168 }
169 fprintf(f, "};\n\nconst int exppow[2048] = {\n");
170 for (int i=0; i<2048; i++) {
171 fprintf(f, "%d%s", atoi(strchr(x[i],'e')+1), (i==2047?"":",") );
172 }
173 fprintf(f, "};\n\n");
174 fclose(f);
175 return R_NilValue;
176 }
177 */
178
writeFloat64(double * col,int64_t row,char ** pch)179 void writeFloat64(double *col, int64_t row, char **pch)
180 {
181 // hand-rolled / specialized for speed
182 // *pch is safely the output destination with enough space (ensured via calculating maxLineLen up front)
183 // technique similar to base R (format.c:formatReal and printutils.c:EncodeReal0)
184 // differences/tricks :
185 // i) no buffers. writes straight to the final file buffer passed to write()
186 // ii) no C libary calls such as sprintf() where the fmt string has to be interpretted over and over
187 // iii) no need to return variables or flags. Just writes.
188 // iv) shorter, easier to read and reason with in one self contained place.
189 double x = col[row];
190 char *ch = *pch;
191 if (!isfinite(x)) {
192 if (isnan(x)) {
193 write_chars(na, &ch);
194 } else {
195 if (x<0) *ch++ = '-';
196 *ch++ = 'I'; *ch++ = 'n'; *ch++ = 'f';
197 }
198 } else if (x == 0.0) {
199 *ch++ = '0'; // and we're done. so much easier rather than passing back special cases
200 } else {
201 if (x < 0.0) { *ch++ = '-'; x = -x; } // and we're done on sign, already written. no need to pass back sign
202 union { double d; uint64_t l; } u;
203 u.d = x;
204 uint64_t fraction = u.l & 0xFFFFFFFFFFFFF; // (1<<52)-1;
205 uint32_t exponent = (int32_t)((u.l>>52) & 0x7FF); // [0,2047]
206
207 // Now sum the appropriate powers 2^-(1:52) of the fraction
208 // Important for accuracy to start with the smallest first; i.e. 2^-52
209 // Exact powers of 2 (1.0, 2.0, 4.0, etc) are represented precisely with fraction==0
210 // Skip over tailing zeros for exactly representable numbers such 0.5, 0.75
211 // Underflow here (0u-1u = all 1s) is on an unsigned type which is ok by C standards
212 // sigparts[0] arranged to be 0.0 in genLookups() to enable branch free loop here
213 double acc = 0; // 'long double' not needed
214 int i = 52;
215 if (fraction) {
216 while ((fraction & 0xFF) == 0) { fraction >>= 8; i-=8; }
217 while (fraction) {
218 acc += sigparts[(((fraction & 1u)^1u)-1u) & i];
219 i--;
220 fraction >>= 1;
221 }
222 }
223 // 1.0+acc is in range [1.5,2.0) by IEEE754
224 // expsig is in range [1.0,10.0) by design of fwriteLookups.h
225 // Therefore y in range [1.5,20.0)
226 // Avoids (potentially inaccurate and potentially slow) log10/log10l, pow/powl, ldexp/ldexpl
227 // By design we can just lookup the power from the tables
228 double y = (1.0+acc) * expsig[exponent]; // low magnitude mult
229 int exp = exppow[exponent];
230 if (y>=9.99999999999999) { y /= 10; exp++; }
231 uint64_t l = y * SIZE_SF; // low magnitude mult 10^NUM_SF
232 // l now contains NUM_SF+1 digits as integer where repeated /10 below is accurate
233
234 // if (verbose) Rprintf(_("\nTRACE: acc=%.20Le ; y=%.20Le ; l=%"PRIu64" ; e=%d "), acc, y, l, exp);
235
236 if (l%10 >= 5) l+=10; // use the last digit to round
237 l /= 10;
238 if (l == 0) {
239 if (*(ch-1)=='-') ch--;
240 *ch++ = '0';
241 } else {
242 // Count trailing zeros and therefore s.f. present in l
243 int trailZero = 0;
244 while (l%10 == 0) { l /= 10; trailZero++; }
245 int sf = NUM_SF - trailZero;
246 if (sf==0) {sf=1; exp++;} // e.g. l was 9999999[5-9] rounded to 10000000 which added 1 digit
247
248 // l is now an unsigned long that doesn't start or end with 0
249 // sf is the number of digits now in l
250 // exp is e<exp> were l to be written with the decimal sep after the first digit
251 int dr = sf-exp-1; // how many characters to print to the right of the decimal place
252 int width=0; // field width were it written decimal format. Used to decide whether to or not.
253 int dl0=0; // how many 0's to add to the left of the decimal place before starting l
254 if (dr<=0) { dl0=-dr; dr=0; width=sf+dl0; } // 1, 10, 100, 99000
255 else {
256 if (sf>dr) width=sf+1; // 1.234 and 123.4
257 else { dl0=1; width=dr+1+dl0; } // 0.1234, 0.0001234
258 }
259 // So: 3.1416 => l=31416, sf=5, exp=0 dr=4; dl0=0; width=6
260 // 30460 => l=3046, sf=4, exp=4 dr=0; dl0=1; width=5
261 // 0.0072 => l=72, sf=2, exp=-3 dr=4; dl0=1; width=6
262 if (width <= sf + (sf>1) + 2 + (abs(exp)>99?3:2) + scipen) {
263 // ^^^^ to not include 1 char for dec in -7e-04 where sf==1
264 // ^ 2 for 'e+'/'e-'
265 // decimal format ...
266 ch += width-1;
267 if (dr) {
268 while (dr && sf) { *ch--='0'+l%10; l/=10; dr--; sf--; }
269 while (dr) { *ch--='0'; dr--; }
270 *ch-- = dec;
271 }
272 while (dl0) { *ch--='0'; dl0--; }
273 while (sf) { *ch--='0'+l%10; l/=10; sf--; }
274 // ch is now 1 before the first char of the field so position it afterward again, and done
275 ch += width+1;
276 } else {
277 // scientific ...
278 ch += sf; // sf-1 + 1 for dec
279 for (int i=sf; i>1; i--) {
280 *ch-- = '0' + l%10;
281 l /= 10;
282 }
283 if (sf == 1) ch--; else *ch-- = dec;
284 *ch = '0' + l;
285 ch += sf + (sf>1);
286 *ch++ = 'e'; // lower case e to match base::write.csv
287 if (exp < 0) { *ch++ = '-'; exp=-exp; }
288 else { *ch++ = '+'; } // to match base::write.csv
289 if (exp < 100) {
290 *ch++ = '0' + (exp / 10);
291 *ch++ = '0' + (exp % 10);
292 } else {
293 *ch++ = '0' + (exp / 100);
294 *ch++ = '0' + (exp / 10) % 10;
295 *ch++ = '0' + (exp % 10);
296 }
297 }
298 }
299 }
300 *pch = ch;
301 }
302
writeComplex(Rcomplex * col,int64_t row,char ** pch)303 void writeComplex(Rcomplex *col, int64_t row, char **pch)
304 {
305 Rcomplex x = col[row];
306 char *ch = *pch;
307 writeFloat64(&x.r, 0, &ch);
308 if (!ISNAN(x.i)) {
309 if (x.i >= 0.0) *ch++ = '+'; // else writeFloat64 writes the - sign
310 writeFloat64(&x.i, 0, &ch);
311 *ch++ = 'i';
312 }
313 *pch = ch;
314 }
315
316 // DATE/TIME
317
write_time(int32_t x,char ** pch)318 static inline void write_time(int32_t x, char **pch)
319 // just a helper called below by the real writers (time-only and datetime)
320 {
321 char *ch = *pch;
322 if (x<0) { // <0 covers NA_INTEGER too (==INT_MIN checked in init.c)
323 write_chars(na, &ch);
324 } else {
325 int hh = x/3600;
326 int mm = (x - hh*3600) / 60;
327 int ss = x%60;
328 *ch++ = '0'+hh/10;
329 *ch++ = '0'+hh%10;
330 *ch++ = ':';
331 ch -= squashDateTime;
332 *ch++ = '0'+mm/10;
333 *ch++ = '0'+mm%10;
334 *ch++ = ':';
335 ch -= squashDateTime;
336 *ch++ = '0'+ss/10;
337 *ch++ = '0'+ss%10;
338 }
339 *pch = ch;
340 }
341
writeITime(int32_t * col,int64_t row,char ** pch)342 void writeITime(int32_t *col, int64_t row, char **pch) {
343 write_time(col[row], pch);
344 }
345
write_date(int32_t x,char ** pch)346 static inline void write_date(int32_t x, char **pch)
347 // just a helper called below by the two real writers (date-only and datetime)
348 {
349 // From base ?Date :
350 // " Dates are represented as the number of days since 1970-01-01, with negative values
351 // for earlier dates. They are always printed following the rules of the current Gregorian calendar,
352 // even though that calendar was not in use long ago (it was adopted in 1752 in Great Britain and its
353 // colonies) "
354
355 // The algorithm here in data.table::fwrite was taken from civil_from_days() here :
356 // http://howardhinnant.github.io/date_algorithms.html
357 // which was donated to the public domain thanks to Howard Hinnant, 2013.
358 // The rebase to 1 March 0000 is inspired: avoids needing isleap() at all.
359 // The only small modifications here are :
360 // 1) no need for era
361 // 2) impose date range of [0000-03-01, 9999-12-31]. All 3,652,365 dates tested in test 1739
362 // 3) use direct lookup for mmdd rather than the math using 153, 2 and 5
363 // 4) use true/false value (md/100)<3 rather than ?: branch
364 // The end result is 5 lines of simple branch free integer math with no library calls.
365 // as.integer(as.Date(c("0000-03-01","9999-12-31"))) == c(-719468,+2932896)
366
367 char *ch = *pch;
368 if (x< -719468 || x>2932896) {
369 // NA_INTEGER<(-719468) (==INT_MIN checked in init.c)
370 write_chars(na, &ch);
371 } else {
372 x += 719468; // convert days from 1970-01-01 to days from 0000-03-01 (the day after 29 Feb 0000)
373 int y = (x - x/1461 + x/36525 - x/146097) / 365; // year of the preceeding March 1st
374 int z = x - y*365 - y/4 + y/100 - y/400 + 1; // days from March 1st in year y
375 int md = monthday[z]; // See fwriteLookups.h for how the 366 item lookup 'monthday' is arranged
376 y += z && (md/100)<3; // The +1 above turned z=-1 to 0 (meaning Feb29 of year y not Jan or Feb of y+1)
377
378 ch += 7 + 2*!squashDateTime;
379 *ch-- = '0'+md%10; md/=10;
380 *ch-- = '0'+md%10; md/=10;
381 *ch-- = '-';
382 ch += squashDateTime;
383 *ch-- = '0'+md%10; md/=10;
384 *ch-- = '0'+md%10; md/=10;
385 *ch-- = '-';
386 ch += squashDateTime;
387 *ch-- = '0'+y%10; y/=10;
388 *ch-- = '0'+y%10; y/=10;
389 *ch-- = '0'+y%10; y/=10;
390 *ch = '0'+y%10; y/=10;
391 ch += 8 + 2*!squashDateTime;
392 }
393 *pch = ch;
394 }
395
writeDateInt32(int32_t * col,int64_t row,char ** pch)396 void writeDateInt32(int32_t *col, int64_t row, char **pch) {
397 write_date(col[row], pch);
398 }
399
writeDateFloat64(double * col,int64_t row,char ** pch)400 void writeDateFloat64(double *col, int64_t row, char **pch) {
401 write_date(isfinite(col[row]) ? (int)(col[row]) : INT32_MIN, pch);
402 }
403
writePOSIXct(double * col,int64_t row,char ** pch)404 void writePOSIXct(double *col, int64_t row, char **pch)
405 {
406 // Write ISO8601 UTC by default to encourage ISO standards, stymie ambiguity and for speed.
407 // R internally represents POSIX datetime in UTC always. Its 'tzone' attribute can be ignored.
408 // R's representation ignores leap seconds too which is POSIX compliant, convenient and fast.
409 // Aside: an often overlooked option for users is to start R in UTC: $ TZ='UTC' R
410 // All positive integers up to 2^53 (9e15) are exactly representable by double which is relied
411 // on in the ops here; number of seconds since epoch.
412
413 double x = col[row];
414 char *ch = *pch;
415 if (!isfinite(x)) {
416 write_chars(na, &ch);
417 } else {
418 int64_t xi, d, t;
419 if (x>=0) {
420 xi = floor(x);
421 d = xi / 86400;
422 t = xi % 86400;
423 } else {
424 // before 1970-01-01T00:00:00Z
425 xi = floor(x);
426 d = (xi+1)/86400 - 1;
427 t = xi - d*86400; // xi and d are both negative here; t becomes the positive number of seconds into the day
428 }
429 int m = ((x-xi)*10000000); // 7th digit used to round up if 9
430 m += (m%10); // 9 is numerical accuracy, 8 or less then we truncate to last microsecond
431 m /= 10;
432 write_date(d, &ch);
433 *ch++ = 'T';
434 ch -= squashDateTime;
435 write_time(t, &ch);
436 if (squashDateTime || (m && m%1000==0)) {
437 // when squashDateTime always write 3 digits of milliseconds even if 000, for consistent scale of squash integer64
438 // don't use writeInteger() because it doesn't 0 pad which we need here
439 // integer64 is big enough for squash with milli but not micro; trunc (not round) micro when squash
440 m /= 1000;
441 *ch++ = '.';
442 ch -= squashDateTime;
443 *(ch+2) = '0'+m%10; m/=10;
444 *(ch+1) = '0'+m%10; m/=10;
445 *ch = '0'+m;
446 ch += 3;
447 } else if (m) {
448 // microseconds are present and !squashDateTime
449 *ch++ = '.';
450 *(ch+5) = '0'+m%10; m/=10;
451 *(ch+4) = '0'+m%10; m/=10;
452 *(ch+3) = '0'+m%10; m/=10;
453 *(ch+2) = '0'+m%10; m/=10;
454 *(ch+1) = '0'+m%10; m/=10;
455 *ch = '0'+m;
456 ch += 6;
457 }
458 *ch++ = 'Z';
459 ch -= squashDateTime;
460 }
461 *pch = ch;
462 }
463
writeNanotime(int64_t * col,int64_t row,char ** pch)464 void writeNanotime(int64_t *col, int64_t row, char **pch)
465 {
466 int64_t x = col[row];
467 char *ch = *pch;
468 if (x == INT64_MIN) {
469 write_chars(na, &ch);
470 } else {
471 int d/*days*/, s/*secs*/, n/*nanos*/;
472 n = x % 1000000000;
473 x /= 1000000000;
474 if (x>=0 && n>=0) {
475 d = x / 86400;
476 s = x % 86400;
477 } else {
478 // before 1970-01-01T00:00:00.000000000Z
479 if (n) { x--; n += 1000000000; }
480 d = (x+1)/86400 - 1;
481 s = x - d*86400; // x and d are both negative here; secs becomes the positive number of seconds into the day
482 }
483 write_date(d, &ch);
484 *ch++ = 'T';
485 ch -= squashDateTime;
486 write_time(s, &ch);
487 *ch++ = '.';
488 ch -= squashDateTime;
489 for (int i=8; i>=0; i--) { *(ch+i) = '0'+n%10; n/=10; } // always 9 digits for nanoseconds
490 ch += 9;
491 *ch++ = 'Z';
492 ch -= squashDateTime;
493 }
494 *pch = ch;
495 }
496
write_string(const char * x,char ** pch)497 static inline void write_string(const char *x, char **pch)
498 {
499 char *ch = *pch;
500 if (x == NULL) {
501 // NA is not quoted even when quote=TRUE to distinguish from quoted "NA" value. But going forward: ,,==NA and ,"",==empty string
502 write_chars(na, &ch);
503 } else {
504 int8_t q = doQuote;
505 if (q==INT8_MIN) { // NA means quote="auto"
506 const char *tt = x;
507 if (*tt=='\0') {
508 // Empty strings are always quoted to distinguish from ,,==NA
509 *ch++='"'; *ch++='"'; // test 1732.7 covers this (confirmed in gdb) so it's unknown why codecov claims no coverage
510 *pch = ch;
511 return;
512 }
513 while (*tt!='\0' && *tt!=sep && *tt!=sep2 && *tt!='\n' && *tt!='\r' && *tt!='"') *ch++ = *tt++;
514 // Windows includes \n in its \r\n so looking for \n only is sufficient
515 // sep2 is set to '\0' when no list columns are present
516 if (*tt=='\0') {
517 // most common case: no sep, newline or " contained in string
518 *pch = ch; // advance caller over the field already written
519 return;
520 }
521 ch = *pch; // rewind the field written since it needs to be quoted
522 q = true;
523 }
524 if (q==false) {
525 write_chars(x, &ch);
526 } else {
527 *ch++ = '"';
528 const char *tt = x;
529 if (qmethodEscape) {
530 while (*tt!='\0') {
531 if (*tt=='"' || *tt=='\\') *ch++ = '\\';
532 *ch++ = *tt++;
533 }
534 } else {
535 // qmethod='double'
536 while (*tt!='\0') {
537 if (*tt=='"') *ch++ = '"';
538 *ch++ = *tt++;
539 }
540 }
541 *ch++ = '"';
542 }
543 }
544 *pch = ch;
545 }
546
writeString(const void * col,int64_t row,char ** pch)547 void writeString(const void *col, int64_t row, char **pch)
548 {
549 write_string(getString(col, row), pch);
550 }
551
writeCategString(const void * col,int64_t row,char ** pch)552 void writeCategString(const void *col, int64_t row, char **pch)
553 {
554 write_string(getCategString(col, row), pch);
555 }
556
557 #ifndef NOZLIB
init_stream(z_stream * stream)558 int init_stream(z_stream *stream) {
559 memset(stream, 0, sizeof(z_stream)); // shouldn't be needed, done as part of #4099 to be sure
560 stream->next_in = Z_NULL;
561 stream->zalloc = Z_NULL;
562 stream->zfree = Z_NULL;
563 stream->opaque = Z_NULL;
564
565 // 31 comes from : windows bits 15 | 16 gzip format
566 int err = deflateInit2(stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 31, 8, Z_DEFAULT_STRATEGY);
567 return err; // # nocov
568 }
569
compressbuff(z_stream * stream,void * dest,size_t * destLen,const void * source,size_t sourceLen)570 int compressbuff(z_stream *stream, void* dest, size_t *destLen, const void* source, size_t sourceLen)
571 {
572 stream->next_out = dest;
573 stream->avail_out = *destLen;
574 stream->next_in = (Bytef *)source; // don't use z_const anywhere; #3939
575 stream->avail_in = sourceLen;
576 int err = deflate(stream, Z_FINISH);
577 if (err == Z_OK) {
578 // with Z_FINISH, deflate must return Z_STREAM_END if correct, otherwise it's an error and we shouldn't return Z_OK (0)
579 err = -9; // # nocov
580 }
581 *destLen = stream->total_out;
582 return err == Z_STREAM_END ? Z_OK : err;
583 }
584 #endif
585
fwriteMain(fwriteMainArgs args)586 void fwriteMain(fwriteMainArgs args)
587 {
588 double startTime = wallclock();
589 double nextTime = startTime+2; // start printing progress meter in 2 sec if not completed by then
590
591 na = args.na;
592 sep = args.sep;
593 sep2 = args.sep2;
594 dec = args.dec;
595 scipen = args.scipen;
596 doQuote = args.doQuote;
597 verbose = args.verbose;
598
599 // When NA is a non-empty string, then we must quote all string fields in case they contain the na string
600 // na is recommended to be empty, though
601 if (na[0]!='\0' && doQuote==INT8_MIN) doQuote = true;
602
603 qmethodEscape = args.qmethodEscape;
604 squashDateTime = args.squashDateTime;
605
606 if (args.buffMB<1 || args.buffMB>1024) STOP(_("buffMB=%d outside [1,1024]"), args.buffMB);
607 size_t buffSize = (size_t)1024*1024*args.buffMB;
608
609 int eolLen=strlen(args.eol), naLen=strlen(args.na);
610 // Aside: codacy wants strnlen but strnlen is not in C99 (neither is strlen_s). To pass `gcc -std=c99 -Wall -pedantic`
611 // we'd need `#define _POSIX_C_SOURCE 200809L` before #include <string.h> but that seems a step too far
612 // and platform specific. We prefer to be pure C99.
613 if (eolLen<=0) STOP(_("eol must be 1 or more bytes (usually either \\n or \\r\\n) but is length %d"), eolLen);
614
615 if (verbose) {
616 DTPRINT(_("Column writers: "));
617 if (args.ncol<=50) {
618 for (int j=0; j<args.ncol; j++) DTPRINT(_("%d "), args.whichFun[j]);
619 } else {
620 for (int j=0; j<30; j++) DTPRINT(_("%d "), args.whichFun[j]);
621 DTPRINT(_("... "));
622 for (int j=args.ncol-10; j<args.ncol; j++) DTPRINT(_("%d "), args.whichFun[j]);
623 }
624 DTPRINT(_("\nargs.doRowNames=%d args.rowNames=%d doQuote=%d args.nrow=%"PRId64" args.ncol=%d eolLen=%d\n"),
625 args.doRowNames, args.rowNames, doQuote, args.nrow, args.ncol, eolLen);
626 }
627
628 // Calculate upper bound for line length. Numbers use a fixed maximum (e.g. 12 for integer) while strings find the longest
629 // string in each column. Upper bound is then the sum of the columns' max widths.
630 // This upper bound is required to determine a reasonable rowsPerBatch. It also saves needing to grow the buffers which
631 // is especially tricky when compressing, and saves needing to check/limit the buffer writing because we know
632 // up front the buffer does have sufficient capacity.
633 // A large overestimate (e.g. 2-5x too big) is ok, provided it is not so large that the buffers can't be allocated.
634 // Do this first so that, for example, any unsupported types in list columns happen first before opening file (which
635 // could be console output) and writing column names to it.
636
637 double t0 = wallclock();
638 size_t maxLineLen = eolLen + args.ncol*(2*(doQuote!=0) + 1/*sep*/);
639 if (args.doRowNames) {
640 maxLineLen += args.rowNames ? getMaxStringLen(args.rowNames, args.nrow)*2 : 1+(int)log10(args.nrow); // the width of the row number
641 maxLineLen += 2*(doQuote!=0/*NA('auto') or true*/) + 1/*sep*/;
642 }
643 for (int j=0; j<args.ncol; j++) {
644 int width = writerMaxLen[args.whichFun[j]];
645 if (width==0) {
646 switch(args.whichFun[j]) {
647 case WF_String:
648 width = getMaxStringLen(args.columns[j], args.nrow);
649 break;
650 case WF_CategString:
651 width = getMaxCategLen(args.columns[j]);
652 break;
653 case WF_List:
654 width = getMaxListItemLen(args.columns[j], args.nrow);
655 break;
656 default:
657 STOP(_("Internal error: type %d has no max length method implemented"), args.whichFun[j]); // # nocov
658 }
659 }
660 if (args.whichFun[j]==WF_Float64 && args.scipen>0) width+=MIN(args.scipen,350); // clamp width to IEEE754 max to avoid scipen=99999 allocating buffer larger than can ever be written
661 if (width<naLen) width = naLen;
662 maxLineLen += width*2; // *2 in case the longest string is all quotes and they all need to be escaped
663 }
664 if (verbose) DTPRINT(_("maxLineLen=%"PRIu64". Found in %.3fs\n"), (uint64_t)maxLineLen, 1.0*(wallclock()-t0));
665
666 int f=0;
667 if (*args.filename=='\0') {
668 f=-1; // file="" means write to standard output
669 args.is_gzip = false; // gzip is only for file
670 // eol = "\n"; // We'll use DTPRINT which converts \n to \r\n inside it on Windows
671 } else {
672 #ifdef WIN32
673 f = _open(args.filename, _O_WRONLY | _O_BINARY | _O_CREAT | (args.append ? _O_APPEND : _O_TRUNC), _S_IWRITE);
674 // O_BINARY rather than O_TEXT for explicit control and speed since it seems that write() has a branch inside it
675 // to convert \n to \r\n on Windows when in text mode not not when in binary mode.
676 #else
677 f = open(args.filename, O_WRONLY | O_CREAT | (args.append ? O_APPEND : O_TRUNC), 0666);
678 // There is no binary/text mode distinction on Linux and Mac
679 #endif
680 if (f == -1) {
681 // # nocov start
682 int erropen = errno;
683 STOP(access( args.filename, F_OK ) != -1 ?
684 _("%s: '%s'. Failed to open existing file for writing. Do you have write permission to it? Is this Windows and does another process such as Excel have it open?") :
685 _("%s: '%s'. Unable to create new file for writing (it does not exist already). Do you have permission to write here, is there space on the disk and does the path exist?"),
686 strerror(erropen), args.filename);
687 // # nocov end
688 }
689 }
690 #ifdef NOZLIB
691 if (args.is_gzip)
692 STOP(_("Compression in fwrite uses zlib library. Its header files were not found at the time data.table was compiled. To enable fwrite compression, please reinstall data.table and study the output for further guidance.")); // # nocov
693 #endif
694
695 int yamlLen = strlen(args.yaml);
696 if (verbose) {
697 DTPRINT(_("Writing bom (%s), yaml (%d characters) and column names (%s) ... "),
698 args.bom?"true":"false", yamlLen, args.colNames?"true":"false");
699 if (f==-1) DTPRINT(_("\n"));
700 }
701 size_t headerLen = 0;
702 if (args.bom) headerLen += 3;
703 headerLen += yamlLen;
704 if (args.colNames) {
705 for (int j=0; j<args.ncol; j++) headerLen += getStringLen(args.colNames, j)*2; // *2 in case quotes are escaped or doubled
706 headerLen += args.ncol*(1/*sep*/+(doQuote!=0)*2) + eolLen + 3; // 3 in case doRowNames and doQuote (the first blank <<"",>> column name)
707 }
708 if (headerLen) {
709 char *buff = malloc(headerLen);
710 if (!buff) STOP(_("Unable to allocate %d MiB for header: %s"), headerLen / 1024 / 1024, strerror(errno));
711 char *ch = buff;
712 if (args.bom) {*ch++=(char)0xEF; *ch++=(char)0xBB; *ch++=(char)0xBF; } // 3 appears above (search for "bom")
713 memcpy(ch, args.yaml, yamlLen);
714 ch += yamlLen;
715 if (args.colNames) {
716 if (args.doRowNames) {
717 // Unusual: the extra blank column name when row_names are added as the first column
718 if (doQuote!=0/*'auto'(NA) or true*/) { *ch++='"'; *ch++='"'; } // to match write.csv
719 *ch++ = sep;
720 }
721 for (int j=0; j<args.ncol; j++) {
722 writeString(args.colNames, j, &ch);
723 *ch++ = sep;
724 }
725 ch--; // backup over the last sep
726 write_chars(args.eol, &ch);
727 }
728 if (f==-1) {
729 *ch = '\0';
730 DTPRINT(buff);
731 free(buff);
732 } else {
733 int ret1=0, ret2=0;
734 if (args.is_gzip) {
735 #ifndef NOZLIB
736 z_stream stream = {0};
737 if(init_stream(&stream)) {
738 free(buff); // # nocov
739 STOP(_("Can't allocate gzip stream structure")); // # nocov
740 }
741 size_t zbuffSize = deflateBound(&stream, headerLen);
742 char *zbuff = malloc(zbuffSize);
743 if (!zbuff) {
744 free(buff); // # nocov
745 STOP(_("Unable to allocate %d MiB for zbuffer: %s"), zbuffSize / 1024 / 1024, strerror(errno)); // # nocov
746 }
747 size_t zbuffUsed = zbuffSize;
748 ret1 = compressbuff(&stream, zbuff, &zbuffUsed, buff, (size_t)(ch-buff));
749 if (ret1==Z_OK) ret2 = WRITE(f, zbuff, (int)zbuffUsed);
750 deflateEnd(&stream);
751 free(zbuff);
752 #endif
753 } else {
754 ret2 = WRITE(f, buff, (int)(ch-buff));
755 }
756 free(buff);
757 if (ret1 || ret2==-1) {
758 // # nocov start
759 int errwrite = errno; // capture write errno now incase close fails with a different errno
760 CLOSE(f);
761 if (ret1) STOP(_("Compress gzip error: %d"), ret1);
762 else STOP(_("%s: '%s'"), strerror(errwrite), args.filename);
763 // # nocov end
764 }
765 }
766 }
767 if (verbose) DTPRINT(_("done in %.3fs\n"), 1.0*(wallclock()-t0));
768 if (args.nrow == 0) {
769 if (verbose) DTPRINT(_("No data rows present (nrow==0)\n"));
770 if (f!=-1 && CLOSE(f)) STOP(_("%s: '%s'"), strerror(errno), args.filename);
771 return;
772 }
773
774 // Writing rows
775
776 // Decide buffer size and rowsPerBatch for each thread
777 // Once rowsPerBatch is decided it can't be changed
778 int rowsPerBatch=0;
779 if (maxLineLen*2>buffSize) { buffSize=2*maxLineLen; rowsPerBatch=2; }
780 else rowsPerBatch = buffSize / maxLineLen;
781 if (rowsPerBatch > args.nrow) rowsPerBatch = args.nrow;
782 if (rowsPerBatch < 1) rowsPerBatch = 1;
783 int numBatches = (args.nrow-1)/rowsPerBatch + 1;
784 int nth = args.nth;
785 if (numBatches < nth) nth = numBatches;
786 if (verbose) {
787 DTPRINT(_("Writing %"PRId64" rows in %d batches of %d rows (each buffer size %dMB, showProgress=%d, nth=%d)\n"),
788 args.nrow, numBatches, rowsPerBatch, args.buffMB, args.showProgress, nth);
789 }
790 t0 = wallclock();
791
792 bool hasPrinted = false;
793 int maxBuffUsedPC = 0;
794
795 // compute zbuffSize which is the same for each thread
796 size_t zbuffSize = 0;
797 if(args.is_gzip){
798 #ifndef NOZLIB
799 z_stream stream = {0};
800 if(init_stream(&stream))
801 STOP(_("Can't allocate gzip stream structure")); // # nocov
802 zbuffSize = deflateBound(&stream, buffSize);
803 if (verbose) DTPRINT("zbuffSize=%d returned from deflateBound\n", (int)zbuffSize);
804 deflateEnd(&stream);
805 #endif
806 }
807
808 errno=0;
809 char *buffPool = malloc(nth*(size_t)buffSize);
810 if (!buffPool) {
811 // # nocov start
812 STOP(_("Unable to allocate %d MB * %d thread buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."),
813 (size_t)buffSize/(1024^2), nth, errno, strerror(errno));
814 // # nocov end
815 }
816 char *zbuffPool = NULL;
817 if (args.is_gzip) {
818 zbuffPool = malloc(nth*(size_t)zbuffSize);
819 #ifndef NOZLIB
820 if (!zbuffPool) {
821 // # nocov start
822 free(buffPool);
823 STOP(_("Unable to allocate %d MB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."),
824 (size_t)zbuffSize/(1024^2), nth, errno, strerror(errno));
825 // # nocov end
826 }
827 #endif
828 }
829
830 bool failed = false; // naked (unprotected by atomic) write to bool ok because only ever write true in this special paradigm
831 int failed_compress = 0; // the first thread to fail writes their reason here when they first get to ordered section
832 int failed_write = 0; // same. could use +ve and -ve in the same code but separate it out to trace Solaris problem, #3931
833
834 #ifndef NOZLIB
835 z_stream thread_streams[nth];
836 // VLA on stack should be fine for nth structs; in zlib v1.2.11 sizeof(struct)==112 on 64bit
837 // not declared inside the parallel region because solaris appears to move the struct in
838 // memory when the #pragma omp for is entered, which causes zlib's internal self reference
839 // pointer to mismatch, #4099
840 char failed_msg[1001] = ""; // to hold zlib's msg; copied out of zlib in ordered section just in case the msg is allocated within zlib
841 #endif
842
843 #pragma omp parallel num_threads(nth)
844 {
845 int me = omp_get_thread_num();
846 int my_failed_compress = 0;
847 char *ch, *myBuff;
848 ch = myBuff = buffPool + me*buffSize;
849
850 void *myzBuff = NULL;
851 size_t myzbuffUsed = 0;
852 #ifndef NOZLIB
853 z_stream *mystream = &thread_streams[me];
854 if (args.is_gzip) {
855 myzBuff = zbuffPool + me*zbuffSize;
856 if (init_stream(mystream)) { // this should be thread safe according to zlib documentation
857 failed = true; // # nocov
858 my_failed_compress = -998; // # nocov
859 }
860 }
861 #endif
862
863 #pragma omp for ordered schedule(dynamic)
864 for(int64_t start=0; start<args.nrow; start+=rowsPerBatch) {
865 if (failed) continue; // Not break. Because we don't use #omp cancel yet.
866 int64_t end = ((args.nrow - start)<rowsPerBatch) ? args.nrow : start + rowsPerBatch;
867 for (int64_t i=start; i<end; i++) {
868 // Tepid starts here (once at beginning of each per line)
869 if (args.doRowNames) {
870 if (args.rowNames==NULL) {
871 if (doQuote!=0/*NA'auto' or true*/) *ch++='"';
872 int64_t rn = i+1;
873 writeInt64(&rn, 0, &ch);
874 if (doQuote!=0) *ch++='"';
875 } else {
876 writeString(args.rowNames, i, &ch);
877 }
878 *ch++=sep;
879 }
880 // Hot loop
881 for (int j=0; j<args.ncol; j++) {
882 (args.funs[args.whichFun[j]])(args.columns[j], i, &ch);
883 *ch++ = sep;
884 }
885 // Tepid again (once at the end of each line)
886 ch--; // backup onto the last sep after the last column. ncol>=1 because 0-columns was caught earlier.
887 write_chars(args.eol, &ch); // overwrite last sep with eol instead
888 }
889 // compress buffer if gzip
890 #ifndef NOZLIB
891 if (args.is_gzip && !failed) {
892 myzbuffUsed = zbuffSize;
893 int ret = compressbuff(mystream, myzBuff, &myzbuffUsed, myBuff, (size_t)(ch-myBuff));
894 if (ret) { failed=true; my_failed_compress=ret; }
895 else deflateReset(mystream);
896 }
897 #endif
898 #pragma omp ordered
899 {
900 if (failed) {
901 // # nocov start
902 if (failed_compress==0 && my_failed_compress!=0) {
903 failed_compress = my_failed_compress;
904 #ifndef NOZLIB
905 if (mystream->msg!=NULL) strncpy(failed_msg, mystream->msg, 1000); // copy zlib's msg for safe use after deflateEnd just in case zlib allocated the message
906 #endif
907 }
908 // else another thread could have failed below while I was working or waiting above; their reason got here first
909 // # nocov end
910 } else {
911 errno=0;
912 if (f==-1) {
913 *ch='\0'; // standard C string end marker so DTPRINT knows where to stop
914 DTPRINT(myBuff);
915 } else if ((args.is_gzip ? WRITE(f, myzBuff, (int)myzbuffUsed)
916 : WRITE(f, myBuff, (int)(ch-myBuff))) == -1) {
917 failed=true; // # nocov
918 failed_write=errno; // # nocov
919 }
920
921 int used = 100*((double)(ch-myBuff))/buffSize; // percentage of original buffMB
922 if (used > maxBuffUsedPC) maxBuffUsedPC = used;
923 double now;
924 if (me==0 && args.showProgress && (now=wallclock())>=nextTime && !failed) {
925 // See comments above inside the f==-1 clause.
926 // Not only is this ordered section one-at-a-time but we'll also Rprintf() here only from the
927 // master thread (me==0) and hopefully this will work on Windows. If not, user should set
928 // showProgress=FALSE until this can be fixed or removed.
929 // # nocov start
930 int ETA = (int)((args.nrow-end)*((now-startTime)/end));
931 if (hasPrinted || ETA >= 2) {
932 if (verbose && !hasPrinted) DTPRINT("\n");
933 DTPRINT("\rWritten %.1f%% of %"PRId64" rows in %d secs using %d thread%s. "
934 "maxBuffUsed=%d%%. ETA %d secs. ",
935 (100.0*end)/args.nrow, args.nrow, (int)(now-startTime), nth, nth==1?"":"s",
936 maxBuffUsedPC, ETA);
937 // TODO: use progress() as in fread
938 nextTime = now+1;
939 hasPrinted = true;
940 }
941 // # nocov end
942 }
943 // May be possible for master thread (me==0) to call R_CheckUserInterrupt() here.
944 // Something like:
945 // if (me==0) {
946 // failed = TRUE; // inside ordered here; the slaves are before ordered and not looking at 'failed'
947 // R_CheckUserInterrupt();
948 // failed = FALSE; // no user interrupt so return state
949 // }
950 // But I fear the slaves will hang waiting for the master (me==0) to complete the ordered
951 // section which may not happen if the master thread has been interrupted. Rather than
952 // seeing failed=TRUE and falling through to free() and close() as intended.
953 // Could register a finalizer to free() and close() perhaps :
954 // [r-devel] http://r.789695.n4.nabble.com/checking-user-interrupts-in-C-code-tp2717528p2717722.html
955 // Conclusion for now: do not provide ability to interrupt.
956 // write() errors and malloc() fails will be caught and cleaned up properly, however.
957 ch = myBuff; // back to the start of my buffer ready to fill it up again
958 }
959 }
960 }
961 // all threads will call this free on their buffer, even if one or more threads had malloc
962 // or realloc fail. If the initial malloc failed, free(NULL) is ok and does nothing.
963 if (args.is_gzip) {
964 #ifndef NOZLIB
965 deflateEnd(mystream);
966 #endif
967 }
968 }
969 free(buffPool);
970 free(zbuffPool);
971
972 // Finished parallel region and can call R API safely now.
973 if (hasPrinted) {
974 // # nocov start
975 if (!failed) { // clear the progress meter
976 DTPRINT("\r "
977 " \r");
978 } else { // don't clear any potentially helpful output before error
979 DTPRINT("\n");
980 }
981 // # nocov end
982 }
983
984 if (f!=-1 && CLOSE(f) && !failed)
985 STOP("%s: '%s'", strerror(errno), args.filename); // # nocov
986 // quoted '%s' in case of trailing spaces in the filename
987 // If a write failed, the line above tries close() to clean up, but that might fail as well. So the
988 // '&& !failed' is to not report the error as just 'closing file' but the next line for more detail
989 // from the original error.
990 if (failed) {
991 // # nocov start
992 #ifndef NOZLIB
993 if (failed_compress)
994 STOP(_("zlib %s (zlib.h %s) deflate() returned error %d with z_stream->msg==\"%s\" Z_FINISH=%d Z_BLOCK=%d. %s"),
995 zlibVersion(), ZLIB_VERSION, failed_compress, failed_msg, Z_FINISH, Z_BLOCK,
996 verbose ? _("Please include the full output above and below this message in your data.table bug report.")
997 : _("Please retry fwrite() with verbose=TRUE and include the full output with your data.table bug report."));
998 #endif
999 if (failed_write)
1000 STOP("%s: '%s'", strerror(failed_write), args.filename);
1001 // # nocov end
1002 }
1003 }
1004
1005
1006