1 // This library is part of PLINK 2.00, copyright (C) 2005-2020 Shaun Purcell,
2 // Christopher Chang.
3 //
4 // This library is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by the
6 // Free Software Foundation, either version 3 of the License, or (at your
7 // option) any later version.
8 //
9 // This library is distributed in the hope that it will be useful, but WITHOUT
10 // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 // FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
12 // for more details.
13 //
14 // You should have received a copy of the GNU Lesser General Public License
15 // along with this library.  If not, see <http://www.gnu.org/licenses/>.
16 
17 #include <errno.h>
18 #include "plink2_text.h"
19 
20 #ifdef __cplusplus
21 namespace plink2 {
22 #endif
23 
GetTxfp(textFILE * txf_ptr)24 static inline textFILEMain* GetTxfp(textFILE* txf_ptr) {
25   return &GET_PRIVATE(*txf_ptr, m);
26 }
27 
GetTxsp(TextStream * txs_ptr)28 static inline TextStreamMain* GetTxsp(TextStream* txs_ptr) {
29   return &GET_PRIVATE(*txs_ptr, m);
30 }
31 
GetTxspK(const TextStream * txs_ptr)32 static inline const TextStreamMain* GetTxspK(const TextStream* txs_ptr) {
33   return &GET_PRIVATE(*txs_ptr, m);
34 }
35 
GetFileType(const char * fname,FileCompressionType * ftype_ptr)36 PglErr GetFileType(const char* fname, FileCompressionType* ftype_ptr) {
37   FILE* infile = fopen(fname, FOPEN_RB);
38   if (unlikely(!infile)) {
39     // Note that this does not print an error message (since it may be called
40     // by a worker thread).
41     return kPglRetOpenFail;
42   }
43   unsigned char buf[16];
44   const uint32_t nbytes = fread_unlocked(buf, 1, 16, infile);
45   if (unlikely(ferror_unlocked(infile) || fclose(infile))) {
46     return kPglRetReadFail;
47   }
48   if (nbytes < 4) {
49     *ftype_ptr = kFileUncompressed;
50     return kPglRetSuccess;
51   }
52   uint32_t magic4;
53   memcpy(&magic4, buf, 4);
54 
55   if (IsZstdFrame(magic4)) {
56     *ftype_ptr = kFileZstd;
57     return kPglRetSuccess;
58   }
59   if (S_CAST(uint16_t, magic4) != 0x8b1f) { // gzip ID1/ID2 bytes
60     *ftype_ptr = kFileUncompressed;
61     return kPglRetSuccess;
62   }
63   if ((nbytes == 16) && IsBgzfHeader(buf)) {
64     *ftype_ptr = kFileBgzf;
65   } else {
66     *ftype_ptr = kFileGzip;
67   }
68   return kPglRetSuccess;
69 }
70 
EraseTextFileBase(TextFileBase * trbp)71 void EraseTextFileBase(TextFileBase* trbp) {
72   trbp->consume_iter = nullptr;
73   trbp->consume_stop = nullptr;
74   trbp->errmsg = nullptr;
75   trbp->reterr = kPglRetEof;
76   trbp->ff = nullptr;
77   trbp->dst = nullptr;
78 }
79 
PreinitTextFile(textFILE * txf_ptr)80 void PreinitTextFile(textFILE* txf_ptr) {
81   EraseTextFileBase(&GetTxfp(txf_ptr)->base);
82 }
83 
GzRawInit(const void * buf,uint32_t nbytes,GzRawDecompressStream * gzp)84 BoolErr GzRawInit(const void* buf, uint32_t nbytes, GzRawDecompressStream* gzp) {
85   gzp->ds_initialized = 0;
86   gzp->in = S_CAST(unsigned char*, malloc(kDecompressChunkSize));
87   if (!gzp->in) {
88     return 1;
89   }
90   z_stream* dsp = &gzp->ds;
91   memcpy(gzp->in, buf, nbytes);
92   dsp->next_in = gzp->in;
93   dsp->avail_in = nbytes;
94   dsp->zalloc = nullptr;
95   dsp->zfree = nullptr;
96   dsp->opaque = nullptr;
97   if (unlikely(inflateInit2(dsp, MAX_WBITS | 16) != Z_OK)) {
98     return 1;
99   }
100   gzp->ds_initialized = 1;
101   return 0;
102 }
103 
ZstRawInit(const void * buf,uint32_t nbytes,ZstRawDecompressStream * zstp)104 BoolErr ZstRawInit(const void* buf, uint32_t nbytes, ZstRawDecompressStream* zstp) {
105   zstp->ib.src = malloc(kDecompressChunkSize);
106   if (unlikely(!zstp->ib.src)) {
107     zstp->ds = nullptr;
108     return 1;
109   }
110   zstp->ds = ZSTD_createDStream();
111   if (unlikely(!zstp->ds)) {
112     return 1;
113   }
114   memcpy(K_CAST(void*, zstp->ib.src), buf, nbytes);
115   zstp->ib.size = nbytes;
116   zstp->ib.pos = 0;
117   return 0;
118 }
119 
120 const char kShortErrRfileAlreadyOpen[] = "TextFileOpenInternal can't be called on an already-open file";
121 const char kShortErrRfileEnforcedMaxBlenTooSmall[] = "TextFileOpenInternal: enforced_max_line_blen too small (must be at least max(1 MiB, dst_capacity - 1 MiB))";
122 const char kShortErrRfileDstCapacityTooSmall[] = "TextFileOpenInternal: dst_capacity too small (2 MiB minimum)";
123 
TextFileOpenInternal(const char * fname,uint32_t enforced_max_line_blen,uint32_t dst_capacity,char * dst,textFILEMain * txfp,TextStreamMain * txsp)124 PglErr TextFileOpenInternal(const char* fname, uint32_t enforced_max_line_blen, uint32_t dst_capacity, char* dst, textFILEMain* txfp, TextStreamMain* txsp) {
125   PglErr reterr = kPglRetSuccess;
126   TextFileBase* trbp;
127   if (txfp) {
128     trbp = &txfp->base;
129   } else {
130     trbp = &txsp->base;
131   }
132   {
133     // 1. Open file, get type.
134     if (unlikely(trbp->ff)) {
135       reterr = kPglRetImproperFunctionCall;
136       trbp->errmsg = kShortErrRfileAlreadyOpen;
137       goto TextFileOpenInternal_ret_1;
138     }
139     if (enforced_max_line_blen || txfp) {
140       if (unlikely(enforced_max_line_blen < kDecompressMinBlen)) {
141         reterr = kPglRetImproperFunctionCall;
142         trbp->errmsg = kShortErrRfileEnforcedMaxBlenTooSmall;
143         goto TextFileOpenInternal_ret_1;
144       }
145       if (dst) {
146         if (unlikely(dst_capacity < kDecompressMinCapacity)) {
147           reterr = kPglRetImproperFunctionCall;
148           trbp->errmsg = kShortErrRfileDstCapacityTooSmall;
149           goto TextFileOpenInternal_ret_1;
150         }
151         if (unlikely(enforced_max_line_blen + kDecompressChunkSize < dst_capacity)) {
152           reterr = kPglRetImproperFunctionCall;
153           trbp->errmsg = kShortErrRfileEnforcedMaxBlenTooSmall;
154           goto TextFileOpenInternal_ret_1;
155         }
156       }
157     } else {
158       // token-reading mode.  dst == nullptr not currently supported.
159       assert(dst && (dst_capacity == kTokenStreamBlen));
160     }
161     trbp->ff = fopen(fname, FOPEN_RB);
162     if (unlikely(!trbp->ff)) {
163       goto TextFileOpenInternal_ret_OPEN_FAIL;
164     }
165     trbp->file_type = kFileUncompressed;
166     if (dst) {
167       trbp->dst_owned_by_consumer = 1;
168       trbp->dst_capacity = dst_capacity;
169     } else {
170       dst = S_CAST(char*, malloc(kDecompressMinCapacity));
171       if (unlikely(dst == nullptr)) {
172         goto TextFileOpenInternal_ret_NOMEM;
173       }
174       trbp->dst_owned_by_consumer = 0;
175       trbp->dst_capacity = kDecompressMinCapacity;
176     }
177     trbp->dst = dst;
178     uint32_t nbytes = fread_unlocked(dst, 1, 16, trbp->ff);
179     trbp->dst_len = nbytes;
180     trbp->enforced_max_line_blen = enforced_max_line_blen;
181     trbp->consume_iter = dst;
182     trbp->consume_stop = dst;
183     if (nbytes >= 4) {
184       const uint32_t magic4 = *R_CAST(uint32_t*, dst);
185       if (IsZstdFrame(magic4)) {
186         trbp->dst_len = 0;
187         trbp->file_type = kFileZstd;
188         ZstRawDecompressStream* zstp;
189         if (txfp) {
190           zstp = &txfp->rds.zst;
191         } else {
192           zstp = &txsp->rds.zst;
193         }
194         if (unlikely(ZstRawInit(dst, nbytes, zstp))) {
195           goto TextFileOpenInternal_ret_NOMEM;
196         }
197       } else if ((magic4 << 8) == 0x088b1f00) {
198         // gzip ID1/ID2 bytes, deflate compression method
199         trbp->dst_len = 0;
200         if ((nbytes == 16) && IsBgzfHeader(dst)) {
201           trbp->file_type = kFileBgzf;
202           if (txfp) {
203             BgzfRawDecompressStream* bgzfp = &txfp->rds.bgzf;
204             bgzfp->in = S_CAST(unsigned char*, malloc(kDecompressChunkSize));
205             if (unlikely(!bgzfp->in)) {
206               bgzfp->ldc = nullptr;
207               goto TextFileOpenInternal_ret_NOMEM;
208             }
209             bgzfp->ldc = libdeflate_alloc_decompressor();
210             if (!bgzfp->ldc) {
211               goto TextFileOpenInternal_ret_NOMEM;
212             }
213             memcpy(bgzfp->in, dst, nbytes);
214             bgzfp->in_size = nbytes;
215             bgzfp->in_pos = 0;
216           } else {
217             reterr = BgzfRawMtStreamInit(dst, txsp->decompress_thread_ct, trbp->ff, nullptr, &txsp->rds.bgzf, &trbp->errmsg);
218             if (unlikely(reterr)) {
219               goto TextFileOpenInternal_ret_1;
220             }
221           }
222         } else {
223           trbp->file_type = kFileGzip;
224           GzRawDecompressStream* gzp;
225           if (txfp) {
226             gzp = &txfp->rds.gz;
227           } else {
228             gzp = &txsp->rds.gz;
229           }
230           if (unlikely(GzRawInit(dst, nbytes, gzp))) {
231             goto TextFileOpenInternal_ret_NOMEM;
232           }
233         }
234       }
235     } else if (!nbytes) {
236       if (unlikely(!feof_unlocked(trbp->ff))) {
237         goto TextFileOpenInternal_ret_READ_FAIL;
238       }
239       // May as well accept this.
240       // Don't jump to ret_1 since we're setting txfp->reterr to a different
241       // value than we're returning.
242       trbp->reterr = kPglRetEof;
243       return kPglRetSuccess;
244     }
245   }
246   while (0) {
247   TextFileOpenInternal_ret_NOMEM:
248     reterr = kPglRetNomem;
249     break;
250   TextFileOpenInternal_ret_OPEN_FAIL:
251     reterr = kPglRetOpenFail;
252     trbp->errmsg = strerror(errno);
253     break;
254   TextFileOpenInternal_ret_READ_FAIL:
255     reterr = kPglRetReadFail;
256     trbp->errmsg = strerror(errno);
257     break;
258   }
259  TextFileOpenInternal_ret_1:
260   trbp->reterr = reterr;
261   return reterr;
262 }
263 
TextFileOpenEx(const char * fname,uint32_t enforced_max_line_blen,uint32_t dst_capacity,char * dst,textFILE * txf_ptr)264 PglErr TextFileOpenEx(const char* fname, uint32_t enforced_max_line_blen, uint32_t dst_capacity, char* dst, textFILE* txf_ptr) {
265   return TextFileOpenInternal(fname, enforced_max_line_blen, dst_capacity, dst, GetTxfp(txf_ptr), nullptr);
266 }
267 
268 // Set enforced_max_line_blen == 0 in the token-reading case.
IsPathologicallyLongLineOrToken(const char * line_start,const char * load_start,const char * known_line_end,uint32_t enforced_max_line_blen)269 BoolErr IsPathologicallyLongLineOrToken(const char* line_start, const char* load_start, const char* known_line_end, uint32_t enforced_max_line_blen) {
270   if (enforced_max_line_blen) {
271     // Preconditions:
272     // * No \n in [line_start, load_start).
273     // * (known_line_end - load_start) is usually <= enforced_max_line_blen,
274     //   and never much larger.  Not a hard requirement, but it's better to
275     //   enforce the line-length limit during line iteration outside this
276     //   regime to avoid duplicating work.
277     if (S_CAST(uintptr_t, known_line_end - line_start) <= enforced_max_line_blen) {
278       return 0;
279     }
280     const uint32_t already_scanned_byte_ct = load_start - line_start;
281     if (unlikely(already_scanned_byte_ct >= enforced_max_line_blen)) {
282       return 1;
283     }
284     const char* memchr_result = S_CAST(const char*, memchr(load_start, '\n', enforced_max_line_blen - already_scanned_byte_ct));
285     if (unlikely(!memchr_result)) {
286       return 1;
287     }
288     // If we've found a line with terminal \n at or after this address, there
289     // are <= enforced_max_line_blen bytes left, so no remaining line can be
290     // longer.
291     const char* memchr_result_thresh = known_line_end - (enforced_max_line_blen + 1);
292     while (1) {
293       if (memchr_result >= memchr_result_thresh) {
294         return 0;
295       }
296       memchr_result = S_CAST(const char*, memchr(&(memchr_result[1]), '\n', enforced_max_line_blen));
297       if (unlikely(!memchr_result)) {
298         return 1;
299       }
300     }
301   }
302   if (S_CAST(uintptr_t, known_line_end - line_start) <= kMaxTokenBlen) {
303     return 0;
304   }
305   const uint32_t already_scanned_byte_ct = load_start - line_start;
306   if (unlikely(already_scanned_byte_ct >= kMaxTokenBlen)) {
307     return 1;
308   }
309   // No loop needed for now, since token-scanning buffer sizes are hardcoded.
310   //
311   // Replace with a forward-scanning version of this functionality when
312   // available ("FirstPostspaceBoundedFar"?)
313   return (LastSpaceOrEoln(load_start, kMaxTokenBlen - already_scanned_byte_ct) == nullptr);
314 }
315 
316 const char kShortErrRfileTruncatedGz[] = "GzRawStreamRead: gzipped file appears to be truncated";
317 
GzRawStreamRead(char * dst_end,FILE * ff,GzRawDecompressStream * gzp,char ** dst_iterp,const char ** errmsgp)318 PglErr GzRawStreamRead(char* dst_end, FILE* ff, GzRawDecompressStream* gzp, char** dst_iterp, const char** errmsgp) {
319   z_stream* dsp = &gzp->ds;
320   if ((!dsp->avail_in) && feof_unlocked(ff)) {
321     return kPglRetSuccess;
322   }
323   char* dst_iter = *dst_iterp;
324   do {
325     int zerr = Z_OK;
326     if (dsp->avail_in) {  // can be zero after TextRewind()
327       dsp->next_out = R_CAST(unsigned char*, dst_iter);
328       dsp->avail_out = dst_end - dst_iter;
329       zerr = inflate(dsp, Z_SYNC_FLUSH);
330       if (unlikely((zerr < 0) || (zerr == Z_NEED_DICT))) {
331         if (dsp->msg) {
332           *errmsgp = dsp->msg;
333         } else {
334           *errmsgp = zError(zerr);
335         }
336         return kPglRetDecompressFail;
337       }
338       dst_iter = R_CAST(char*, dsp->next_out);
339       if (dsp->avail_in) {
340         assert(dst_iter == dst_end);
341         break;
342       }
343     }
344     const uint32_t nbytes = fread_unlocked(gzp->in, 1, kDecompressChunkSize, ff);
345     dsp->next_in = gzp->in;
346     dsp->avail_in = nbytes;
347     if (!nbytes) {
348       if (unlikely(!feof_unlocked(ff))) {
349         *errmsgp = strerror(errno);
350         return kPglRetReadFail;
351       }
352       if (unlikely(zerr == Z_OK)) {
353         *errmsgp = kShortErrRfileTruncatedGz;
354         return kPglRetDecompressFail;
355       }
356       // Normal EOF.
357       break;
358     }
359   } while (dst_iter != dst_end);
360   *dst_iterp = dst_iter;
361   return kPglRetSuccess;
362 }
363 
ZstRawStreamRead(char * dst_end,FILE * ff,ZstRawDecompressStream * zstp,char ** dst_iterp,const char ** errmsgp)364 PglErr ZstRawStreamRead(char* dst_end, FILE* ff, ZstRawDecompressStream* zstp, char** dst_iterp, const char** errmsgp) {
365   if ((!zstp->ib.size) && feof_unlocked(ff)) {
366     return kPglRetSuccess;
367   }
368   // Sequentially dependent blocks limited to ~128 KiB.
369   char* dst_iter = *dst_iterp;
370   while (1) {
371     ZSTD_outBuffer zob = {R_CAST(unsigned char*, dst_iter), S_CAST(size_t, dst_end - dst_iter), 0};
372     // ib.size == 0 ok, no need to special-case rewind.
373     const uintptr_t read_size_hint = ZSTD_decompressStream(zstp->ds, &zob, &zstp->ib);
374     if (unlikely(ZSTD_isError(read_size_hint))) {
375       *errmsgp = ZSTD_getErrorName(read_size_hint);
376       return kPglRetDecompressFail;
377     }
378     dst_iter = &(dst_iter[zob.pos]);
379     if (dst_iter == dst_end) {
380       break;
381     }
382     // Decoder has flushed everything it could.  Either we're at EOF, or we
383     // must load more.
384     unsigned char* in = S_CAST(unsigned char*, K_CAST(void*, zstp->ib.src));
385     const uint32_t n_inbytes = zstp->ib.size - zstp->ib.pos;
386     memmove(in, &(in[zstp->ib.pos]), n_inbytes);
387     unsigned char* load_start = &(in[n_inbytes]);
388     const uint32_t nbytes = fread_unlocked(load_start, 1, kDecompressChunkSize - n_inbytes, ff);
389     if (unlikely(ferror_unlocked(ff))) {
390       *errmsgp = strerror(errno);
391       return kPglRetReadFail;
392     }
393     zstp->ib.pos = 0;
394     zstp->ib.size = nbytes + n_inbytes;
395     if (!nbytes) {
396       if (unlikely(n_inbytes)) {
397         *errmsgp = kShortErrZstdPrefixUnknown;
398         return kPglRetDecompressFail;
399       }
400       break;
401     }
402   }
403   *dst_iterp = dst_iter;
404   return kPglRetSuccess;
405 }
406 
407 const char kShortErrLongLine[] = "Pathologically long line";
408 const char kShortErrInteriorEmptyLine[] = "Unexpected interior empty line";
409 
TextFileAdvance(textFILE * txf_ptr)410 PglErr TextFileAdvance(textFILE* txf_ptr) {
411   textFILEMain* txfp = GetTxfp(txf_ptr);
412   TextFileBase* basep = &txfp->base;
413   if (basep->reterr) {
414     return basep->reterr;
415   }
416   PglErr reterr = kPglRetSuccess;
417   {
418     char* line_start = basep->consume_stop;
419     assert(basep->consume_iter == line_start);
420     char* dst = basep->dst;
421     char* dst_load_start;
422     while (1) {
423       const uint32_t dst_offset = line_start - dst;
424       const uint32_t dst_rem = basep->dst_len - dst_offset;
425       // (dst_rem guaranteed to be < basep->enforced_max_line_blen here, since
426       // otherwise we error out earlier.)
427       // Two cases:
428       // 1. Move (possibly empty) unfinished line to the beginning of the
429       //    buffer.
430       // 2. Resize the buffer/report out-of-memory.
431       if (dst_rem < basep->dst_capacity - kDecompressChunkSize) {
432         if (dst_offset) {
433           memmove(dst, line_start, dst_rem);
434         }
435       } else {
436         if (unlikely(basep->dst_owned_by_consumer)) {
437           goto TextFileAdvance_ret_NOMEM;
438         }
439         uint32_t next_dst_capacity = basep->enforced_max_line_blen + kDecompressChunkSize;
440         if ((next_dst_capacity / 2) > basep->dst_capacity) {
441           next_dst_capacity = basep->dst_capacity * 2;
442         }
443 #ifndef __LP64__
444         if (next_dst_capacity >= 0x80000000U) {
445           goto TextFileAdvance_ret_NOMEM;
446         }
447 #endif
448         char* dst_next;
449         if (!dst_offset) {
450           dst_next = S_CAST(char*, realloc(dst, next_dst_capacity));
451           if (unlikely(!dst_next)) {
452             goto TextFileAdvance_ret_NOMEM;
453           }
454         } else {
455           dst_next = S_CAST(char*, malloc(next_dst_capacity));
456           if (unlikely(!dst_next)) {
457             goto TextFileAdvance_ret_NOMEM;
458           }
459           memcpy(dst_next, line_start, dst_rem);
460         }
461         basep->dst = dst_next;
462         dst = dst_next;
463       }
464       line_start = dst;
465       dst_load_start = &(dst[dst_rem]);
466       FILE* ff = basep->ff;
467       char* dst_iter = dst_load_start;
468       // We don't want to always fill the entire buffer here.  The main plink2
469       // use case of textFILE is to just peek at an unknown-length header line
470       // with a maximal-length line-load buffer, compute a
471       // legitimate-line-length bound, and then (move-)construct a TextStream
472       // with the shorter buffer size.
473       // Instead, we load up to the smallest power of 2 >= (dst_rem + 1 MiB).
474       uintptr_t stop_offset = (2 * k1LU) << bsru32(dst_rem + kDecompressChunkSize - 1);
475       if (stop_offset > basep->dst_capacity) {
476         stop_offset = basep->dst_capacity;
477       }
478       char* dst_stop = &(dst[stop_offset]);
479       basep->consume_iter = dst;
480       switch (basep->file_type) {
481       case kFileUncompressed:
482         {
483           uint32_t rlen = dst_stop - dst_iter;
484           if (rlen > kMaxBytesPerIO) {
485             // We need to know how many bytes were read, so fread_checked()
486             // doesn't work.
487             // This is an if-statement instead of a while loop since rlen can
488             // never be larger than 2 * kMaxBytesPerIO.
489             const uint32_t nbytes = fread_unlocked(dst_iter, 1, kMaxBytesPerIO, ff);
490             if (nbytes < kMaxBytesPerIO) {
491               if (unlikely(ferror_unlocked(ff))) {
492                 goto TextFileAdvance_ret_READ_FAIL;
493               }
494               basep->dst_len = nbytes + dst_rem;
495               break;
496             }
497             rlen -= kMaxBytesPerIO;
498             dst_iter = &(dst_iter[kMaxBytesPerIO]);
499           }
500           const uint32_t nbytes = fread_unlocked(dst_iter, 1, rlen, ff);
501           if (unlikely(ferror_unlocked(ff))) {
502             goto TextFileAdvance_ret_READ_FAIL;
503           }
504           dst_iter = &(dst_iter[nbytes]);
505           break;
506         }
507       case kFileGzip:
508         {
509           reterr = GzRawStreamRead(dst_stop, ff, &txfp->rds.gz, &dst_iter, &basep->errmsg);
510           if (unlikely(reterr)) {
511             goto TextFileAdvance_ret_1;
512           }
513           break;
514         }
515       case kFileBgzf:
516         {
517           // Fully independent blocks limited to 64 KiB.
518           // probable todo: move this to a BgzfRawStreamRead() function in
519           // plink2_bgzf (and move ZstRawStreamRead() to plink2_zstfile).
520           BgzfRawDecompressStream* bgzfp = &txfp->rds.bgzf;
521           if ((!bgzfp->in_size) && feof_unlocked(ff)) {
522             break;
523           }
524           struct libdeflate_decompressor* ldc = bgzfp->ldc;
525           unsigned char* in = bgzfp->in;
526           unsigned char* in_iter = &(in[bgzfp->in_pos]);
527           unsigned char* in_end = &(in[bgzfp->in_size]);
528           while (1) {
529             uint32_t n_inbytes = in_end - in_iter;
530             if (n_inbytes > 25) {
531               if (unlikely(!IsBgzfHeader(in_iter))) {
532                 goto TextFileAdvance_ret_INVALID_BGZF;
533               }
534 #  ifdef __arm__
535 #    error "Unaligned accesses in TextFileAdvance()."
536 #  endif
537               const uint32_t bsize_minus1 = *R_CAST(uint16_t*, &(in_iter[16]));
538               if (unlikely(bsize_minus1 < 25)) {
539                 goto TextFileAdvance_ret_INVALID_BGZF;
540               }
541               if (bsize_minus1 < n_inbytes) {
542                 // We have at least one fully-loaded compressed block.
543                 // Decompress it if we have enough space.
544                 const uint32_t in_size = bsize_minus1 - 25;
545                 const uint32_t out_size = *R_CAST(uint32_t*, &(in_iter[in_size + 22]));
546                 if (unlikely(out_size > 65536)) {
547                   goto TextFileAdvance_ret_INVALID_BGZF;
548                 }
549                 if (out_size > S_CAST(uintptr_t, dst_stop - dst_iter)) {
550                   break;
551                 }
552                 if (unlikely(libdeflate_deflate_decompress(ldc, &(in_iter[18]), in_size, dst_iter, out_size, nullptr))) {
553                   goto TextFileAdvance_ret_INVALID_BGZF;
554                 }
555                 in_iter = &(in_iter[bsize_minus1 + 1]);
556                 dst_iter = &(dst_iter[out_size]);
557                 continue;
558               }
559             }
560             // Either we're at EOF, or we must load more.
561             memmove(in, in_iter, n_inbytes);
562             unsigned char* load_start = &(in[n_inbytes]);
563             const uint32_t nbytes = fread_unlocked(load_start, 1, kDecompressChunkSize - n_inbytes, ff);
564             if (unlikely(ferror_unlocked(ff))) {
565               goto TextFileAdvance_ret_READ_FAIL;
566             }
567             in_iter = in;
568             in_end = &(load_start[nbytes]);
569             bgzfp->in_size = in_end - in;
570             if (!nbytes) {
571               if (unlikely(n_inbytes)) {
572                 goto TextFileAdvance_ret_INVALID_BGZF;
573               }
574               break;
575             }
576           }
577           bgzfp->in_pos = in_iter - in;
578           dst_stop = dst_iter;
579           break;
580         }
581       case kFileZstd:
582         {
583           reterr = ZstRawStreamRead(dst_stop, ff, &txfp->rds.zst, &dst_iter, &basep->errmsg);
584           if (unlikely(reterr)) {
585             goto TextFileAdvance_ret_1;
586           }
587           break;
588         }
589       }
590       basep->dst_len = dst_iter - dst;
591       if (!basep->dst_len) {
592         goto TextFileAdvance_ret_EOF;
593       }
594       if (dst_iter != dst_stop) {
595         // If last character of file isn't a newline, append one to simplify
596         // downstream code.
597         if (dst_iter[-1] != '\n') {
598           *dst_iter++ = '\n';
599           basep->dst_len += 1;
600         }
601         basep->consume_stop = dst_iter;
602         break;
603       }
604       char* last_byte_ptr = Memrchr(dst_load_start, '\n', dst_iter - dst_load_start);
605       if (last_byte_ptr) {
606         basep->consume_stop = &(last_byte_ptr[1]);
607         break;
608       }
609       // Buffer is full, and no '\n' is present.  Restart the loop and try to
610       // load more data (extending the buffer if necessary), if we aren't
611       // already at/past the line-length limit.
612       if (basep->dst_len >= basep->enforced_max_line_blen) {
613         goto TextFileAdvance_ret_LONG_LINE;
614       }
615     }
616     if (unlikely(IsPathologicallyLongLineOrToken(dst, dst_load_start, basep->consume_stop, basep->enforced_max_line_blen))) {
617       goto TextFileAdvance_ret_LONG_LINE;
618     }
619   }
620   while (0) {
621   TextFileAdvance_ret_NOMEM:
622     reterr = kPglRetNomem;
623     break;
624   TextFileAdvance_ret_READ_FAIL:
625     reterr = kPglRetReadFail;
626     basep->errmsg = strerror(errno);
627     break;
628   TextFileAdvance_ret_LONG_LINE:
629     basep->errmsg = kShortErrLongLine;
630     reterr = kPglRetMalformedInput;
631     break;
632   TextFileAdvance_ret_INVALID_BGZF:
633     basep->errmsg = kShortErrInvalidBgzf;
634     reterr = kPglRetDecompressFail;
635     break;
636   TextFileAdvance_ret_EOF:
637     reterr = kPglRetEof;
638     break;
639   }
640  TextFileAdvance_ret_1:
641   basep->reterr = reterr;
642   return reterr;
643 }
644 
TextFileOnlyEmptyLinesLeft(textFILE * txf_ptr)645 PglErr TextFileOnlyEmptyLinesLeft(textFILE* txf_ptr) {
646   TextFileBase* basep = &GetTxfp(txf_ptr)->base;
647   char* line_start = basep->consume_iter;
648   while (1) {
649     if (line_start == basep->consume_stop) {
650       basep->consume_iter = line_start;
651       PglErr reterr = TextFileAdvance(txf_ptr);
652       if (reterr) {
653         return reterr;
654       }
655       line_start = basep->consume_iter;
656     }
657     line_start = FirstNonTspace(line_start);
658     if (unlikely(!IsEolnKns(*line_start))) {
659       basep->reterr = kPglRetMalformedInput;
660       basep->errmsg = kShortErrInteriorEmptyLine;
661       return kPglRetMalformedInput;
662     }
663     line_start = AdvPastDelim(line_start, '\n');
664   }
665 }
666 
TextFileRewind(textFILE * txf_ptr)667 void TextFileRewind(textFILE* txf_ptr) {
668   textFILEMain* txfp = GetTxfp(txf_ptr);
669   TextFileBase* basep = &txfp->base;
670   if ((!basep->ff) || ((basep->reterr) && (basep->reterr != kPglRetEof))) {
671     return;
672   }
673   rewind(basep->ff);
674   basep->reterr = kPglRetSuccess;
675   basep->dst_len = 0;
676   basep->consume_iter = basep->dst;
677   basep->consume_stop = basep->dst;
678   if (basep->file_type != kFileUncompressed) {
679     if (basep->file_type == kFileGzip) {
680       txfp->rds.gz.ds.avail_in = 0;
681 #ifdef NDEBUG
682       inflateReset(&txfp->rds.gz.ds);
683 #else
684       const int errcode = inflateReset(&txfp->rds.gz.ds);
685       assert(errcode == Z_OK);
686 #endif
687     } else if (basep->file_type == kFileBgzf) {
688       txfp->rds.bgzf.in_size = 0;
689       txfp->rds.bgzf.in_pos = 0;
690     } else {
691       // kFileZstd
692       txfp->rds.zst.ib.size = 0;
693       txfp->rds.zst.ib.pos = 0;
694       ZSTD_DCtx_reset(txfp->rds.zst.ds, ZSTD_reset_session_only);
695     }
696   }
697 }
698 
CleanupTextFile(textFILE * txf_ptr,PglErr * reterrp)699 BoolErr CleanupTextFile(textFILE* txf_ptr, PglErr* reterrp) {
700   textFILEMain* txfp = GetTxfp(txf_ptr);
701   TextFileBase* basep = &txfp->base;
702   basep->consume_iter = nullptr;
703   basep->consume_stop = nullptr;
704   basep->reterr = kPglRetEof;
705   basep->errmsg = nullptr;
706   if (basep->dst && (!basep->dst_owned_by_consumer)) {
707     free(basep->dst);
708     basep->dst = nullptr;
709   }
710   if (basep->ff) {
711     if (basep->file_type != kFileUncompressed) {
712       if (basep->file_type == kFileZstd) {
713         if (txfp->rds.zst.ib.src) {
714           free_const(txfp->rds.zst.ib.src);
715           txfp->rds.zst.ib.src = nullptr;
716         }
717         if (txfp->rds.zst.ds) {
718           ZSTD_freeDStream(txfp->rds.zst.ds);
719           txfp->rds.zst.ds = nullptr;
720         }
721       } else if (basep->file_type == kFileBgzf) {
722         if (txfp->rds.bgzf.in) {
723           free(txfp->rds.bgzf.in);
724           txfp->rds.bgzf.in = nullptr;
725         }
726         if (txfp->rds.bgzf.ldc) {
727           libdeflate_free_decompressor(txfp->rds.bgzf.ldc);
728           txfp->rds.bgzf.ldc = nullptr;
729         }
730       } else {
731         // plain gzip
732         if (txfp->rds.gz.in) {
733           free(txfp->rds.gz.in);
734           txfp->rds.gz.in = nullptr;
735         }
736         if (txfp->rds.gz.ds_initialized) {
737           inflateEnd(&txfp->rds.gz.ds);
738         }
739       }
740     }
741     if (unlikely(fclose_null(&basep->ff))) {
742       if (!reterrp) {
743         return 1;
744       }
745       if (*reterrp == kPglRetSuccess) {
746         *reterrp = kPglRetReadFail;
747         return 1;
748       }
749     }
750   }
751   return 0;
752 }
753 
754 
PreinitTextStream(TextStream * txs_ptr)755 void PreinitTextStream(TextStream* txs_ptr) {
756   TextStreamMain* txsp = GetTxsp(txs_ptr);
757   EraseTextFileBase(&txsp->base);
758   txsp->syncp = nullptr;
759 }
760 
761 // This type of code is especially bug-prone (ESR would call it a "defect
762 // attractor").  Goal is to get it right, and fast enough to be a major win
763 // over gzgets()... and then not worry about it again for years.
TextStreamThread(void * raw_arg)764 THREAD_FUNC_DECL TextStreamThread(void* raw_arg) {
765   TextStreamMain* context = S_CAST(TextStreamMain*, raw_arg);
766   TextFileBase* basep = &context->base;
767   TextStreamSync* syncp = context->syncp;
768   FileCompressionType file_type = basep->file_type;
769   RawMtDecompressStream* rdsp = &context->rds;
770   FILE* ff = basep->ff;
771   char* buf = basep->dst;
772   char* buf_end = &(buf[basep->dst_capacity]);
773   char* cur_block_start = basep->consume_stop;
774   char* read_head = &(buf[basep->dst_len]);
775 
776   // We can either be reading/decompressing into memory past the bytes passed
777   // to the consumer, or we can be doing it before those bytes.
778   // In the first case, read_stop is buf_end, but it gets changed to the
779   // latest value of consume_tail when we return to the front of the buffer.
780   // In the second case, read_stop is the position of the first passed byte.
781   char* read_stop = buf_end;
782 #ifdef _WIN32
783   CRITICAL_SECTION* critical_sectionp = &syncp->critical_section;
784   HANDLE reader_progress_event = syncp->reader_progress_event;
785   HANDLE consumer_progress_event = syncp->consumer_progress_event;
786 #else
787   pthread_mutex_t* sync_mutexp = &syncp->sync_mutex;
788   pthread_cond_t* reader_progress_condvarp = &syncp->reader_progress_condvar;
789   pthread_cond_t* consumer_progress_condvarp = &syncp->consumer_progress_condvar;
790 #endif
791   const uint32_t enforced_max_line_blen = basep->enforced_max_line_blen;
792   const char* new_fname = nullptr;
793   const uint32_t is_token_stream = (enforced_max_line_blen == 0);
794   while (1) {
795     TxsInterrupt interrupt = kTxsInterruptNone;
796     PglErr reterr;
797     TxsInterrupt min_interrupt;
798     while (1) {
799       uintptr_t read_attempt_size = read_stop - read_head;
800       if (!read_attempt_size) {
801         const uint32_t memmove_required = (read_stop == buf_end);
802         if (unlikely((cur_block_start == buf) && memmove_required)) {
803           // May need to modify this predicate if we ever allow is_token_stream
804           // && !dst_owned_by_consumer.
805           const uint32_t prev_capacity = buf_end - buf;
806           if (basep->dst_owned_by_consumer || (prev_capacity >= enforced_max_line_blen)) {
807             goto TextStreamThread_LONG_LINE;
808           }
809           // Try to expand buffer.
810           uint32_t next_dst_capacity = enforced_max_line_blen + kDecompressChunkSize;
811           if ((next_dst_capacity / 2) > basep->dst_capacity) {
812             next_dst_capacity = basep->dst_capacity * 2;
813           }
814 #ifndef __LP64__
815           if (next_dst_capacity >= 0x80000000U) {
816             goto TextStreamThread_NOMEM;
817           }
818 #endif
819           char* dst_next = S_CAST(char*, realloc(buf, next_dst_capacity));
820           if (unlikely(!dst_next)) {
821             goto TextStreamThread_NOMEM;
822           }
823 #ifdef _WIN32
824           EnterCriticalSection(critical_sectionp);
825 #else
826           pthread_mutex_lock(sync_mutexp);
827 #endif
828           basep->dst = dst_next;
829           basep->dst_capacity = next_dst_capacity;
830           syncp->consume_tail = dst_next;
831           syncp->available_end = dst_next;
832           syncp->dst_reallocated = 1;
833 #ifdef _WIN32
834           LeaveCriticalSection(critical_sectionp);
835 #else
836           pthread_mutex_unlock(sync_mutexp);
837 #endif
838           buf = dst_next;
839           buf_end = &(buf[next_dst_capacity]);
840           cur_block_start = buf;
841           read_head = &(buf[prev_capacity]);
842           read_stop = &(buf[next_dst_capacity]);
843           continue;
844         }
845         // We cannot continue reading forward.  Cases:
846         // 1. read_stop == buf_end, cur_block_start != buf.  This means we're
847         //    in the middle of reading/decompressing a long line, and want to
848         //    wait for consume_tail == cur_block_start, so we can memmove all
849         //    the bytes back and continue reading forward.  (Tried
850         //    relaxing this to
851         //      consume_tail >= (buf_end - cur_block_start) + margin
852         //    for various values of margin, but that didn't make a meaningful
853         //    difference.)
854         // 2. read_stop == buf_end, cur_block_start == buf.  We failed with a
855         //    long-line error here.
856         // 3. read_stop < buf_end (usual case).  This means the consumer may
857         //    not be done handling some bytes-in-front we handed off earlier.
858         //    We are waiting for consume_tail <= cur_block_start, which means
859         //    all bytes in front have been consumed and we're free to continue
860         //    reading forward.
861         char* latest_consume_tail;
862 #ifdef _WIN32
863         // bugfix (7 May 2018): when consumer thread is waiting with
864         // syncp->consume_tail == cur_block_start, read_stop is near but not at
865         // buf_end, and there's no '\n' in the subsequent read, we can reach
866         // here a second time without releasing the consumer, so we'd enter
867         // deadlock if we unconditionally wait on consumer_progress_event (and
868         // in the Linux/OS X case, we'd be waiting for a spurious wakeup to
869         // save us).
870         // However, if memmove_required isn't true, we have to wait first; see
871         // the 21 Mar bugfix.
872         if (!memmove_required) {
873           goto TextStreamThread_wait_first;
874         }
875         while (1) {
876           EnterCriticalSection(critical_sectionp);
877           interrupt = syncp->interrupt;
878           if (interrupt != kTxsInterruptNone) {
879             goto TextStreamThread_INTERRUPT;
880           }
881           latest_consume_tail = syncp->consume_tail;
882           if (memmove_required) {
883             if (latest_consume_tail == cur_block_start) {
884               syncp->consume_tail = buf;
885               syncp->available_end = buf;
886               break;
887             }
888           } else if (latest_consume_tail <= cur_block_start) {
889             break;
890           }
891           LeaveCriticalSection(critical_sectionp);
892         TextStreamThread_wait_first:
893           WaitForSingleObject(consumer_progress_event, INFINITE);
894         }
895         // bugfix (23 Mar 2018): didn't always leave the critical section
896         LeaveCriticalSection(critical_sectionp);
897 #else
898         pthread_mutex_lock(sync_mutexp);
899         if (!memmove_required) {
900           // Wait for all bytes in front of read_stop to be consumed.
901           goto TextStreamThread_wait_first;
902         }
903         while (1) {
904           interrupt = syncp->interrupt;
905           if (interrupt != kTxsInterruptNone) {
906             goto TextStreamThread_INTERRUPT;
907           }
908           latest_consume_tail = syncp->consume_tail;
909           if (memmove_required) {
910             if (latest_consume_tail == cur_block_start) {
911               // All bytes have been consumed; memmove is now safe.
912               // bugfix (2 Oct 2018): Previously, this just set
913               // syncp->cur_circular_end = cur_block_start, but that created
914               // TWO consume_iter == available_end == cur_circular_end cases,
915               // one of which was handled incorrectly.
916               syncp->consume_tail = buf;
917               syncp->available_end = buf;
918               break;
919             }
920             // There are bytes behind cur_block_start that haven't been
921             // consumed yet.  This is possible on the first iteration through
922             // the loop, since consumer_progress_state may have been set for a
923             // reason we aren't interested in.
924 
925           } else if (latest_consume_tail <= cur_block_start) {
926             // All bytes in front of read_stop have been consumed.
927             break;
928           }
929         TextStreamThread_wait_first:
930           while (!syncp->consumer_progress_state) {
931             pthread_cond_wait(consumer_progress_condvarp, sync_mutexp);
932           }
933           syncp->consumer_progress_state = 0;
934         }
935         pthread_mutex_unlock(sync_mutexp);
936 #endif
937         if (read_stop == buf_end) {
938           const uint32_t cur_memmove_len = buf_end - cur_block_start;
939           memmove(buf, cur_block_start, cur_memmove_len);
940           cur_block_start = buf;
941           read_head = &(buf[cur_memmove_len]);
942         } else {
943           read_stop = buf_end;
944         }
945         continue;
946       }
947       if (read_attempt_size > kDecompressChunkSize) {
948         read_attempt_size = kDecompressChunkSize;
949       }
950       char* cur_read_end = read_head;
951       char* cur_read_stop = &(read_head[read_attempt_size]);
952       switch (file_type) {
953       case kFileUncompressed:
954         {
955           cur_read_end += fread_unlocked(read_head, 1, read_attempt_size, ff);
956           if (unlikely(ferror_unlocked(ff))) {
957             goto TextStreamThread_READ_FAIL;
958           }
959           break;
960         }
961       case kFileGzip:
962         {
963           reterr = GzRawStreamRead(cur_read_stop, ff, &rdsp->gz, &cur_read_end, &syncp->errmsg);
964           if (unlikely(reterr)) {
965             goto TextStreamThread_MISC_FAIL;
966           }
967           break;
968         }
969       case kFileBgzf:
970         {
971           reterr = BgzfRawMtStreamRead(R_CAST(unsigned char*, cur_read_stop), &rdsp->bgzf, R_CAST(unsigned char**, &cur_read_end), &syncp->errmsg);
972           if (unlikely(reterr)) {
973             goto TextStreamThread_MISC_FAIL;
974           }
975           break;
976         }
977       case kFileZstd:
978         {
979           reterr = ZstRawStreamRead(cur_read_stop, ff, &rdsp->zst, &cur_read_end, &syncp->errmsg);
980           if (unlikely(reterr)) {
981             goto TextStreamThread_MISC_FAIL;
982           }
983           break;
984         }
985       }
986       if (cur_read_end < cur_read_stop) {
987         char* final_read_head = cur_read_end;
988         if (cur_block_start != final_read_head) {
989           if (final_read_head[-1] != '\n') {
990             // Append '\n' so consumer can always use rawmemchr(., '\n') to
991             // find the end of the current line.
992             *final_read_head++ = '\n';
993           }
994         }
995         // Still want to consistently enforce max line/token length.
996         if (unlikely(IsPathologicallyLongLineOrToken(cur_block_start, read_head, final_read_head, enforced_max_line_blen))) {
997           goto TextStreamThread_LONG_LINE;
998         }
999         read_head = final_read_head;
1000         goto TextStreamThread_EOF;
1001       }
1002       char* last_byte_ptr;
1003       if (!is_token_stream) {
1004         last_byte_ptr = Memrchr(read_head, '\n', read_attempt_size);
1005       } else {
1006         last_byte_ptr = LastSpaceOrEoln(read_head, read_attempt_size);
1007       }
1008       if (last_byte_ptr) {
1009         char* next_available_end = &(last_byte_ptr[1]);
1010         if (unlikely(IsPathologicallyLongLineOrToken(cur_block_start, read_head, next_available_end, enforced_max_line_blen))) {
1011           goto TextStreamThread_LONG_LINE;
1012         }
1013 #ifdef _WIN32
1014         EnterCriticalSection(critical_sectionp);
1015 #else
1016         pthread_mutex_lock(sync_mutexp);
1017 #endif
1018         interrupt = syncp->interrupt;
1019         if (interrupt != kTxsInterruptNone) {
1020           goto TextStreamThread_INTERRUPT;
1021         }
1022         char* latest_consume_tail = syncp->consume_tail;
1023         const uint32_t all_later_bytes_consumed = (latest_consume_tail <= cur_block_start);
1024         const uint32_t return_to_start = all_later_bytes_consumed && (latest_consume_tail >= &(buf[kDecompressChunkSize]));
1025         if (return_to_start) {
1026           // bugfix (2 Oct 2018): This was previously setting
1027           // syncp->available_end = next_available_end too, and that was being
1028           // handled as a special case which conflicted with a rare legitimate
1029           // case.
1030           syncp->cur_circular_end = next_available_end;
1031           syncp->available_end = buf;
1032         } else {
1033           syncp->available_end = next_available_end;
1034         }
1035 #ifdef _WIN32
1036         // bugfix (23 Mar 2018): this needs to be in the critical section,
1037         // otherwise there's a danger of this resetting legitimate progress
1038         ResetEvent(consumer_progress_event);
1039         SetEvent(reader_progress_event);
1040         LeaveCriticalSection(critical_sectionp);
1041 #else
1042         // bugfix (21 Mar 2018): must force consumer_progress_state to 0 (or
1043         // ResetEvent(consumer_progress_event); otherwise the other wait loop's
1044         // read_stop = buf_end assignment may occur before all later bytes are
1045         // actually consumed, in the next_available_end == latest_consume_tail
1046         // edge case.
1047         syncp->consumer_progress_state = 0;
1048         pthread_cond_signal(reader_progress_condvarp);
1049         pthread_mutex_unlock(sync_mutexp);
1050 #endif
1051         if (return_to_start) {
1052           // Best to return to the beginning of the buffer.
1053           // (Note that read_attempt_size is guaranteed to be
1054           // <= kDecompressChunkSize.)
1055           const uintptr_t trailing_byte_ct = cur_read_end - next_available_end;
1056           memcpy(buf, next_available_end, trailing_byte_ct);
1057           cur_block_start = buf;
1058           read_head = &(buf[trailing_byte_ct]);
1059           // May as well reduce false sharing risk.
1060           read_stop = R_CAST(char*, RoundDownPow2(R_CAST(uintptr_t, latest_consume_tail), kCacheline));
1061           continue;
1062         }
1063         if (all_later_bytes_consumed) {
1064           read_stop = buf_end;
1065         } else {
1066           read_stop = R_CAST(char*, RoundDownPow2(R_CAST(uintptr_t, latest_consume_tail), kCacheline));
1067         }
1068         cur_block_start = next_available_end;
1069       }
1070       read_head = cur_read_end;
1071     }
1072     while (0) {
1073     TextStreamThread_NOMEM:
1074       min_interrupt = kTxsInterruptShutdown;
1075       reterr = kPglRetNomem;
1076       break;
1077     TextStreamThread_OPEN_FAIL:
1078       min_interrupt = kTxsInterruptShutdown;
1079       syncp->errmsg = strerror(errno);
1080       reterr = kPglRetOpenFail;
1081       break;
1082     TextStreamThread_READ_FAIL:
1083       min_interrupt = kTxsInterruptShutdown;
1084       syncp->errmsg = strerror(errno);
1085       reterr = kPglRetReadFail;
1086       break;
1087     TextStreamThread_LONG_LINE:
1088       min_interrupt = kTxsInterruptShutdown;
1089       syncp->errmsg = kShortErrLongLine;
1090       reterr = kPglRetMalformedInput;
1091       break;
1092     TextStreamThread_EOF:
1093       min_interrupt = kTxsInterruptRetarget;
1094       reterr = kPglRetEof;
1095       break;
1096     TextStreamThread_MISC_FAIL:
1097       min_interrupt = kTxsInterruptShutdown;
1098       break;
1099     }
1100     // We need to wait for a message from the consumer before we can usefully
1101     // proceed.
1102     // More precisely:
1103     // * In the eof subcase, we're waiting for either a rewind or shutdown
1104     //   request.
1105     // * In the error subcase, we're just waiting for the shutdown request.
1106 
1107     // Pass the error code back.
1108 #ifdef _WIN32
1109     EnterCriticalSection(critical_sectionp);
1110 #else
1111     pthread_mutex_lock(sync_mutexp);
1112 #endif
1113     syncp->reterr = reterr;
1114     interrupt = syncp->interrupt;
1115     if (interrupt >= min_interrupt) {
1116       // It's our lucky day: we don't need to wait again.
1117       goto TextStreamThread_INTERRUPT;
1118     }
1119     if (reterr == kPglRetEof) {
1120       syncp->available_end = read_head;
1121     }
1122 #ifdef _WIN32
1123     SetEvent(reader_progress_event);
1124     LeaveCriticalSection(critical_sectionp);
1125     while (1) {
1126       WaitForSingleObject(consumer_progress_event, INFINITE);
1127       EnterCriticalSection(critical_sectionp);
1128       interrupt = syncp->interrupt;
1129       if (interrupt >= min_interrupt) {
1130         break;
1131       }
1132       LeaveCriticalSection(critical_sectionp);
1133     }
1134 #else
1135     pthread_cond_signal(reader_progress_condvarp);
1136     do {
1137       while (!syncp->consumer_progress_state) {
1138         pthread_cond_wait(consumer_progress_condvarp, sync_mutexp);
1139       }
1140       syncp->consumer_progress_state = 0;
1141       interrupt = syncp->interrupt;
1142     } while (interrupt < min_interrupt);
1143 #endif
1144   TextStreamThread_INTERRUPT:
1145     // must be in critical section here, or be holding the mutex.
1146     if (interrupt == kTxsInterruptRetarget) {
1147       new_fname = syncp->new_fname;
1148       syncp->interrupt = kTxsInterruptNone;
1149       syncp->reterr = kPglRetSuccess;
1150     }
1151 #ifdef _WIN32
1152     LeaveCriticalSection(critical_sectionp);
1153 #else
1154     pthread_mutex_unlock(sync_mutexp);
1155 #endif
1156     if (interrupt == kTxsInterruptShutdown) {
1157       // possible todo: close the file here
1158       THREAD_RETURN;
1159     }
1160     assert(interrupt == kTxsInterruptRetarget);
1161     read_head = buf;
1162     if (!new_fname) {
1163       if (file_type == kFileBgzf) {
1164         reterr = BgzfRawMtStreamRewind(&rdsp->bgzf, &syncp->errmsg);
1165         if (unlikely(reterr)) {
1166           goto TextStreamThread_MISC_FAIL;
1167         }
1168       } else {
1169         // See TextFileRewind().
1170         rewind(ff);
1171         if (file_type != kFileUncompressed) {
1172           if (file_type == kFileGzip) {
1173             rdsp->gz.ds.avail_in = 0;
1174 #ifdef NDEBUG
1175             inflateReset(&rdsp->gz.ds);
1176 #else
1177             const int errcode = inflateReset(&rdsp->gz.ds);
1178             assert(errcode == Z_OK);
1179 #endif
1180           } else {
1181             // kFileZstd
1182             rdsp->zst.ib.size = 0;
1183             rdsp->zst.ib.pos = 0;
1184             ZSTD_DCtx_reset(rdsp->zst.ds, ZSTD_reset_session_only);
1185           }
1186         }
1187       }
1188     } else {
1189       // Switch to another file, with less creation/destruction of resources.
1190       FILE* next_ff = fopen(new_fname, FOPEN_RB);
1191       if (unlikely(!next_ff)) {
1192         goto TextStreamThread_OPEN_FAIL;
1193       }
1194       // See TextFileOpenInternal().
1195       uint32_t nbytes = fread_unlocked(buf, 1, 16, next_ff);
1196       FileCompressionType next_file_type = kFileUncompressed;
1197       if (nbytes >= 4) {
1198         const uint32_t magic4 = *R_CAST(uint32_t*, buf);
1199         if (IsZstdFrame(magic4)) {
1200           next_file_type = kFileZstd;
1201         } else if ((magic4 << 8) == 0x088b1f00) {
1202           if ((nbytes == 16) && IsBgzfHeader(buf)) {
1203             next_file_type = kFileBgzf;
1204           } else {
1205             next_file_type = kFileGzip;
1206           }
1207         }
1208       }
1209       if (file_type != next_file_type) {
1210         // Destroy old type-specific resources, and allocate new ones.
1211         if (file_type == kFileGzip) {
1212           free(rdsp->gz.in);
1213           inflateEnd(&rdsp->gz.ds);
1214         } else if (file_type == kFileBgzf) {
1215           CleanupBgzfRawMtStream(&rdsp->bgzf);
1216         } else if (file_type == kFileZstd) {
1217           free_const(rdsp->zst.ib.src);
1218           ZSTD_freeDStream(rdsp->zst.ds);
1219         }
1220 
1221         if (unlikely(fclose(ff))) {
1222           fclose(next_ff);
1223           goto TextStreamThread_READ_FAIL;
1224         }
1225         ff = next_ff;
1226         basep->ff = ff;
1227         file_type = next_file_type;
1228         basep->file_type = file_type;
1229         switch (file_type) {
1230         case kFileUncompressed:
1231           read_head = &(read_head[nbytes]);
1232           break;
1233         case kFileGzip:
1234           if (unlikely(GzRawInit(buf, nbytes, &rdsp->gz))) {
1235             goto TextStreamThread_NOMEM;
1236           }
1237           break;
1238         case kFileBgzf:
1239           reterr = BgzfRawMtStreamInit(buf, context->decompress_thread_ct, ff, nullptr, &rdsp->bgzf, &syncp->errmsg);
1240           if (unlikely(reterr)) {
1241             goto TextStreamThread_MISC_FAIL;
1242           }
1243           // bugfix (5 Oct 2019): forgot this break
1244           break;
1245         case kFileZstd:
1246           if (unlikely(ZstRawInit(buf, nbytes, &rdsp->zst))) {
1247             goto TextStreamThread_NOMEM;
1248           }
1249           break;
1250         }
1251       } else {
1252         switch (file_type) {
1253         case kFileUncompressed:
1254           read_head = &(read_head[nbytes]);
1255           break;
1256           // Rest of this is similar to rewind.
1257         case kFileGzip:
1258           {
1259             GzRawDecompressStream* gzp = &rdsp->gz;
1260             z_stream* dsp = &gzp->ds;
1261 #ifdef NDEBUG
1262             inflateReset(dsp);
1263 #else
1264             const int errcode = inflateReset(dsp);
1265             assert(errcode == Z_OK);
1266 #endif
1267             memcpy(gzp->in, buf, nbytes);
1268             dsp->next_in = gzp->in;
1269             dsp->avail_in = nbytes;
1270             break;
1271           }
1272         case kFileBgzf:
1273           {
1274             reterr = BgzfRawMtStreamRetarget(buf, &rdsp->bgzf, next_ff, &syncp->errmsg);
1275             if (unlikely(reterr)) {
1276               fclose(next_ff);
1277               goto TextStreamThread_MISC_FAIL;
1278             }
1279             break;
1280           }
1281         case kFileZstd:
1282           {
1283             ZstRawDecompressStream* zstp = &rdsp->zst;
1284             ZSTD_DCtx_reset(zstp->ds, ZSTD_reset_session_only);
1285             memcpy(K_CAST(void*, zstp->ib.src), buf, nbytes);
1286             zstp->ib.size = nbytes;
1287             zstp->ib.pos = 0;
1288             break;
1289           }
1290         }
1291         if (unlikely(fclose(ff))) {
1292           fclose(next_ff);
1293           goto TextStreamThread_READ_FAIL;
1294         }
1295         ff = next_ff;
1296         basep->ff = ff;
1297       }
1298     }
1299     cur_block_start = buf;
1300     read_stop = buf_end;
1301   }
1302 }
1303 
1304 const char kShortErrRfileInvalid[] = "TextStreamOpenEx can't be called with a closed or error-state textFILE";
1305 
TextStreamOpenEx(const char * fname,uint32_t enforced_max_line_blen,uint32_t dst_capacity,uint32_t decompress_thread_ct,textFILE * txf_ptr,char * dst,TextStream * txs_ptr)1306 PglErr TextStreamOpenEx(const char* fname, uint32_t enforced_max_line_blen, uint32_t dst_capacity, uint32_t decompress_thread_ct, textFILE* txf_ptr, char* dst, TextStream* txs_ptr) {
1307   TextStreamMain* txsp = GetTxsp(txs_ptr);
1308   TextFileBase* txs_basep = &txsp->base;
1309   PglErr reterr = kPglRetSuccess;
1310   {
1311     txsp->decompress_thread_ct = decompress_thread_ct;
1312     if (txf_ptr) {
1313       // Move-construct (unless there was an error, or file is not opened)
1314       if (unlikely((!TextFileIsOpen(txf_ptr)) || TextFileErrcode(txf_ptr))) {
1315         reterr = kPglRetImproperFunctionCall;
1316         txs_basep->errmsg = kShortErrRfileInvalid;
1317         goto TextStreamOpenEx_ret_1;
1318       }
1319       if (unlikely(TextIsOpen(txs_ptr))) {
1320         reterr = kPglRetImproperFunctionCall;
1321         txs_basep->errmsg = kShortErrRfileAlreadyOpen;
1322         goto TextStreamOpenEx_ret_1;
1323       }
1324       textFILEMain* txfp = GetTxfp(txf_ptr);
1325       *txs_basep = txfp->base;  // struct copy
1326       // Simplify TextStreamThread() initialization.
1327       const uint32_t backfill_ct = txs_basep->consume_iter - txs_basep->dst;
1328       if (backfill_ct) {
1329         txs_basep->dst_len -= backfill_ct;
1330         memmove(txs_basep->dst, txs_basep->consume_iter, txs_basep->dst_len);
1331         txs_basep->consume_iter = txs_basep->dst;
1332         txs_basep->consume_stop -= backfill_ct;
1333       }
1334       txs_basep->enforced_max_line_blen = enforced_max_line_blen;
1335       assert(txs_basep->dst_len <= dst_capacity);
1336       txs_basep->dst_capacity = dst_capacity;
1337       reterr = txfp->base.reterr;
1338       const FileCompressionType file_type = txfp->base.file_type;
1339       if (file_type != kFileUncompressed) {
1340         if (file_type == kFileGzip) {
1341           txsp->rds.gz = txfp->rds.gz;
1342         } else if (file_type == kFileZstd) {
1343           txsp->rds.zst = txfp->rds.zst;
1344         } else {
1345           reterr = BgzfRawMtStreamInit(nullptr, decompress_thread_ct, txs_basep->ff, &txfp->rds.bgzf, &txsp->rds.bgzf, &txs_basep->errmsg);
1346           if (unlikely(reterr)) {
1347             EraseTextFileBase(&txfp->base);
1348             goto TextStreamOpenEx_ret_1;
1349           }
1350         }
1351       }
1352       EraseTextFileBase(&txfp->base);
1353     } else {
1354       reterr = TextFileOpenInternal(fname, enforced_max_line_blen, dst_capacity, dst, nullptr, txsp);
1355     }
1356     if (reterr) {
1357       if (reterr == kPglRetEof) {
1358         txs_basep->reterr = kPglRetEof;
1359         return kPglRetSuccess;
1360       }
1361       goto TextStreamOpenEx_ret_1;
1362     }
1363     assert(!txsp->syncp);
1364     TextStreamSync* syncp;
1365     if (unlikely(cachealigned_malloc(RoundUpPow2(sizeof(TextStreamSync), kCacheline), &syncp))) {
1366       goto TextStreamOpenEx_ret_NOMEM;
1367     }
1368     txsp->syncp = syncp;
1369     dst = txs_basep->dst;
1370     syncp->consume_tail = dst;
1371     syncp->cur_circular_end = nullptr;
1372     syncp->available_end = txs_basep->consume_stop;
1373     syncp->errmsg = nullptr;
1374     syncp->reterr = kPglRetSuccess;
1375     syncp->dst_reallocated = 0;
1376     syncp->interrupt = kTxsInterruptNone;
1377     syncp->new_fname = nullptr;
1378 #ifdef _WIN32
1379     syncp->read_thread = nullptr;
1380     // apparently this can raise a low-memory exception in older Windows
1381     // versions, but that's not really our problem.
1382     InitializeCriticalSection(&syncp->critical_section);
1383 
1384     syncp->reader_progress_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
1385     if (unlikely(!syncp->reader_progress_event)) {
1386       DeleteCriticalSection(&syncp->critical_section);
1387       goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1388     }
1389     syncp->consumer_progress_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
1390     if (unlikely(!syncp->consumer_progress_event)) {
1391       DeleteCriticalSection(&syncp->critical_section);
1392       CloseHandle(syncp->reader_progress_event);
1393       goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1394     }
1395     syncp->read_thread = R_CAST(HANDLE, _beginthreadex(nullptr, kDefaultThreadStack, TextStreamThread, txsp, 0, nullptr));
1396     if (unlikely(!syncp->read_thread)) {
1397       DeleteCriticalSection(&syncp->critical_section);
1398       CloseHandle(syncp->consumer_progress_event);
1399       CloseHandle(syncp->reader_progress_event);
1400       goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1401     }
1402 #else
1403     syncp->sync_init_state = 0;
1404     if (unlikely(pthread_mutex_init(&syncp->sync_mutex, nullptr))) {
1405       goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1406     }
1407     syncp->sync_init_state = 1;
1408     if (unlikely(pthread_cond_init(&syncp->reader_progress_condvar, nullptr))) {
1409       goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1410     }
1411     syncp->sync_init_state = 2;
1412     syncp->consumer_progress_state = 0;
1413     if (unlikely(pthread_cond_init(&syncp->consumer_progress_condvar, nullptr))) {
1414       goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1415     }
1416     syncp->sync_init_state = 3;
1417 #  ifndef __cplusplus
1418     pthread_attr_t smallstack_thread_attr;
1419     if (unlikely(pthread_attr_init(&smallstack_thread_attr))) {
1420       goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1421     }
1422     pthread_attr_setstacksize(&smallstack_thread_attr, kDefaultThreadStack);
1423 #  endif
1424     if (unlikely(pthread_create(&syncp->read_thread,
1425 #  ifdef __cplusplus
1426                                 &g_thread_startup.smallstack_thread_attr,
1427 #  else
1428                                 &smallstack_thread_attr,
1429 #  endif
1430                                 TextStreamThread, txsp))) {
1431 #  ifndef __cplusplus
1432       pthread_attr_destroy(&smallstack_thread_attr);
1433 #  endif
1434       goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1435     }
1436 #  ifndef __cplusplus
1437     pthread_attr_destroy(&smallstack_thread_attr);
1438 #  endif
1439     syncp->sync_init_state = 4;
1440 #endif
1441   }
1442   while (0) {
1443   TextStreamOpenEx_ret_NOMEM:
1444     reterr = kPglRetNomem;
1445     break;
1446   TextStreamOpenEx_ret_THREAD_CREATE_FAIL:
1447     reterr = kPglRetThreadCreateFail;
1448     break;
1449   }
1450  TextStreamOpenEx_ret_1:
1451   txs_basep->reterr = reterr;
1452   return reterr;
1453 }
1454 
TextDecompressThreadCt(const TextStream * txs_ptr)1455 uint32_t TextDecompressThreadCt(const TextStream* txs_ptr) {
1456   const TextStreamMain* txsp = GetTxspK(txs_ptr);
1457   FileCompressionType file_type = txsp->base.file_type;
1458   if (file_type == kFileUncompressed) {
1459     return 0;
1460   }
1461   if (file_type != kFileBgzf) {
1462     return 1;
1463   }
1464   return GetThreadCtTg(&txsp->rds.bgzf.tg);
1465 }
1466 
TextAdvance(TextStream * txs_ptr)1467 PglErr TextAdvance(TextStream* txs_ptr) {
1468   TextStreamMain* txsp = GetTxsp(txs_ptr);
1469   TextFileBase* basep = &txsp->base;
1470   char* consume_iter = basep->consume_iter;
1471   TextStreamSync* syncp = txsp->syncp;
1472 #ifdef _WIN32
1473   CRITICAL_SECTION* critical_sectionp = &syncp->critical_section;
1474   HANDLE consumer_progress_event = syncp->consumer_progress_event;
1475   while (1) {
1476     EnterCriticalSection(critical_sectionp);
1477     const PglErr reterr = syncp->reterr;
1478     if (unlikely((reterr != kPglRetSuccess) && (reterr != kPglRetEof))) {
1479       basep->errmsg = syncp->errmsg;
1480       LeaveCriticalSection(critical_sectionp);
1481       basep->reterr = reterr;
1482       // No need to set consumer_progress event here, just let the cleanup
1483       // routine take care of that.
1484       return reterr;
1485     }
1486     char* available_end = syncp->available_end;
1487     char* cur_circular_end = syncp->cur_circular_end;
1488     if (consume_iter == cur_circular_end) {
1489       char* buf = basep->dst;
1490       consume_iter = buf;
1491       basep->consume_iter = buf;
1492       cur_circular_end = nullptr;
1493       syncp->cur_circular_end = nullptr;
1494       if (consume_iter != available_end) {
1495         SetEvent(consumer_progress_event);
1496       }
1497     }
1498     if (syncp->dst_reallocated) {
1499       consume_iter = basep->dst;
1500       syncp->dst_reallocated = 0;
1501     }
1502     syncp->consume_tail = consume_iter;
1503     if ((consume_iter != available_end) || cur_circular_end) {
1504       if (cur_circular_end) {
1505         basep->consume_stop = cur_circular_end;
1506       } else {
1507         basep->consume_stop = available_end;
1508       }
1509       LeaveCriticalSection(critical_sectionp);
1510       // We could set the consumer_progress event here, but it's not really
1511       // necessary?
1512       // SetEvent(consumer_progress_event);
1513       return kPglRetSuccess;
1514     }
1515     SetEvent(consumer_progress_event);
1516     LeaveCriticalSection(critical_sectionp);
1517     // We've processed all the consume-ready bytes...
1518     if (reterr != kPglRetSuccess) {
1519       // ...and we're at eof.  Don't set consumer_progress event here; let that
1520       // wait until cleanup or rewind/retarget.
1521       basep->reterr = kPglRetEof;
1522       return kPglRetEof;
1523     }
1524     // ...and there's probably more.
1525     WaitForSingleObject(syncp->reader_progress_event, INFINITE);
1526     // bugfix (2 Oct 2018)
1527     consume_iter = syncp->consume_tail;
1528     basep->consume_iter = consume_iter;
1529   }
1530 #else
1531   pthread_mutex_t* sync_mutexp = &syncp->sync_mutex;
1532   pthread_cond_t* consumer_progress_condvarp = &syncp->consumer_progress_condvar;
1533   pthread_cond_t* reader_progress_condvarp = &syncp->reader_progress_condvar;
1534   pthread_mutex_lock(sync_mutexp);
1535   while (1) {
1536     const PglErr reterr = syncp->reterr;
1537     if (unlikely((reterr != kPglRetSuccess) && (reterr != kPglRetEof))) {
1538       basep->errmsg = syncp->errmsg;
1539       pthread_mutex_unlock(sync_mutexp);
1540       basep->reterr = reterr;
1541       return reterr;
1542     }
1543     char* available_end = syncp->available_end;
1544     // bugfix (2 Oct 2018): There were TWO consume_iter == available_end ==
1545     // cur_circular_end cases.
1546     // printf("checking for more to consume: %lx %lx %lx\n", (uintptr_t)consume_iter, (uintptr_t)syncp->cur_circular_end, (uintptr_t)available_end);
1547     if (consume_iter == syncp->cur_circular_end) {
1548       char* buf = basep->dst;
1549       consume_iter = buf;
1550       basep->consume_iter = buf;
1551       syncp->cur_circular_end = nullptr;
1552       // File-reader could be waiting on either "all bytes in front have been
1553       // consumed, some bytes behind may remain" or "all bytes have been
1554       // consumed".  Signal in case it's the first.
1555       if (consume_iter != available_end) {
1556         syncp->consumer_progress_state = 1;
1557         pthread_cond_signal(consumer_progress_condvarp);
1558       }
1559     }
1560     if (syncp->dst_reallocated) {
1561       consume_iter = basep->dst;
1562       syncp->dst_reallocated = 0;
1563     }
1564     syncp->consume_tail = consume_iter;
1565     // If cur_circular_end is still non-null here, there must be bytes
1566     // available even when consume_iter == available_end.  (Is the latter
1567     // still possible?  Check this.)
1568     if ((consume_iter != available_end) || syncp->cur_circular_end) {
1569       if (syncp->cur_circular_end) {
1570         basep->consume_stop = syncp->cur_circular_end;
1571       } else {
1572         basep->consume_stop = available_end;
1573       }
1574       // pthread_cond_signal(consumer_progress_condvarp);
1575       pthread_mutex_unlock(sync_mutexp);
1576       // printf("consuming %lx..%lx\n", (uintptr_t)(*consume_iterp), (uintptr_t)rlsp->consume_stop);
1577       return kPglRetSuccess;
1578     }
1579     // We've processed all the consume-ready bytes...
1580     if (reterr != kPglRetSuccess) {
1581       // ...and we're at eof.
1582       pthread_mutex_unlock(sync_mutexp);
1583       basep->reterr = kPglRetEof;
1584       return kPglRetEof;
1585     }
1586     // ...and there's probably more.
1587     syncp->consumer_progress_state = 1;
1588     pthread_cond_signal(consumer_progress_condvarp);
1589     // no need for an explicit spurious-wakeup check, we'll check the progress
1590     // condition (available_end has advanced, or we have a read error) anyway
1591     // and get back here if it isn't satisfied
1592     pthread_cond_wait(reader_progress_condvarp, sync_mutexp);
1593     // bugfix (2 Oct 2018)
1594     consume_iter = syncp->consume_tail;
1595     basep->consume_iter = syncp->consume_tail;
1596   }
1597 #endif
1598 }
1599 
TextOnlyEmptyLinesLeft(TextStream * txs_ptr)1600 PglErr TextOnlyEmptyLinesLeft(TextStream* txs_ptr) {
1601   TextFileBase* basep = &GetTxsp(txs_ptr)->base;
1602   char* line_start = basep->consume_iter;
1603   while (1) {
1604     if (line_start == basep->consume_stop) {
1605       basep->consume_iter = line_start;
1606       PglErr reterr = TextAdvance(txs_ptr);
1607       if (reterr) {
1608         return reterr;
1609       }
1610       line_start = basep->consume_iter;
1611     }
1612     line_start = FirstNonTspace(line_start);
1613     if (unlikely(!IsEolnKns(*line_start))) {
1614       basep->reterr = kPglRetMalformedInput;
1615       basep->errmsg = kShortErrInteriorEmptyLine;
1616       return kPglRetMalformedInput;
1617     }
1618     line_start = AdvPastDelim(line_start, '\n');
1619   }
1620 }
1621 
TextSkipNz(uintptr_t skip_ct,TextStream * txs_ptr)1622 PglErr TextSkipNz(uintptr_t skip_ct, TextStream* txs_ptr) {
1623   TextFileBase* basep = &GetTxsp(txs_ptr)->base;
1624 #ifdef __LP64__
1625   char* consume_iter = basep->consume_iter;
1626   // Minor extension of AdvToNthDelimChecked().
1627   const VecUc vvec_all_lf = vecuc_set1('\n');
1628   while (1) {
1629     uintptr_t starting_addr = R_CAST(uintptr_t, consume_iter);
1630     VecUc* consume_viter = R_CAST(VecUc*, RoundDownPow2(starting_addr, kBytesPerVec));
1631     uintptr_t ending_addr = R_CAST(uintptr_t, basep->consume_stop);
1632     VecUc* consume_vstop = R_CAST(VecUc*, RoundDownPow2(ending_addr, kBytesPerVec));
1633     VecUc cur_vvec = *consume_viter;
1634     VecUc lf_vvec = (cur_vvec == vvec_all_lf);
1635     uint32_t lf_bytes = vecuc_movemask(lf_vvec);
1636     const uint32_t leading_byte_ct = starting_addr - R_CAST(uintptr_t, consume_viter);
1637     const uint32_t leading_mask = UINT32_MAX << leading_byte_ct;
1638     lf_bytes &= leading_mask;
1639     uint32_t cur_lf_ct;
1640     for (; consume_viter != consume_vstop; ) {
1641       cur_lf_ct = PopcountVec8thUint(lf_bytes);
1642       if (cur_lf_ct >= skip_ct) {
1643         goto TextSkipNz_finish;
1644       }
1645       skip_ct -= cur_lf_ct;
1646       // bugfix (28 Sep 2019): forgot to update cur_vvec/lf_vvec/lf_bytes?!
1647       ++consume_viter;
1648       cur_vvec = *consume_viter;
1649       lf_vvec = (cur_vvec == vvec_all_lf);
1650       lf_bytes = vecuc_movemask(lf_vvec);
1651     }
1652     lf_bytes &= (1U << (ending_addr % kBytesPerVec)) - 1;
1653     cur_lf_ct = PopcountVec8thUint(lf_bytes);
1654     if (cur_lf_ct >= skip_ct) {
1655     TextSkipNz_finish:
1656       lf_bytes = ClearBottomSetBits(skip_ct - 1, lf_bytes);
1657       const uint32_t byte_offset_in_vec = ctzu32(lf_bytes) + 1;
1658       const uintptr_t result_addr = R_CAST(uintptr_t, consume_viter) + byte_offset_in_vec;
1659       basep->consume_iter = R_CAST(char*, result_addr);
1660       return kPglRetSuccess;
1661     }
1662     skip_ct -= cur_lf_ct;
1663     // bugfix (30 Oct 2019)
1664     basep->consume_iter = basep->consume_stop;
1665     PglErr reterr = TextAdvance(txs_ptr);
1666     // not unlikely() due to eof
1667     if (reterr) {
1668       return reterr;
1669     }
1670     consume_iter = basep->consume_iter;
1671   }
1672 #else
1673   char* consume_iter = basep->consume_iter;
1674   char* consume_stop = basep->consume_stop;
1675   for (uintptr_t ulii = 0; ulii != skip_ct; ++ulii) {
1676     if (consume_iter == consume_stop) {
1677       basep->consume_iter = consume_iter;
1678       PglErr reterr = TextAdvance(txs_ptr);
1679       if (reterr) {
1680         return reterr;
1681       }
1682       consume_iter = basep->consume_iter;
1683       consume_stop = basep->consume_stop;
1684     }
1685     consume_iter = AdvPastDelim(consume_iter, '\n');
1686   }
1687   basep->consume_iter = consume_iter;
1688   return kPglRetSuccess;
1689 #endif
1690 }
1691 
TextRetarget(const char * new_fname,TextStream * txs_ptr)1692 PglErr TextRetarget(const char* new_fname, TextStream* txs_ptr) {
1693   TextStreamMain* txsp = GetTxsp(txs_ptr);
1694   TextFileBase* basep = &txsp->base;
1695   TextStreamSync* syncp = txsp->syncp;
1696 #ifdef _WIN32
1697   CRITICAL_SECTION* critical_sectionp = &syncp->critical_section;
1698   EnterCriticalSection(critical_sectionp);
1699   const PglErr reterr = syncp->reterr;
1700   if (reterr != kPglRetSuccess) {
1701     if (unlikely(reterr != kPglRetEof)) {
1702       basep->errmsg = syncp->errmsg;
1703       LeaveCriticalSection(critical_sectionp);
1704       basep->reterr = reterr;
1705       return reterr;
1706     }
1707     // clear eof
1708     syncp->reterr = kPglRetSuccess;
1709   }
1710   basep->reterr = kPglRetSuccess;
1711   // bugfix (5 Mar 2018): need to reset these here, can't wait for reader
1712   // thread to receive signal
1713   char* buf = basep->dst;
1714   syncp->consume_tail = buf;
1715   syncp->cur_circular_end = nullptr;
1716   syncp->available_end = buf;
1717   syncp->dst_reallocated = 0;
1718   syncp->interrupt = kTxsInterruptRetarget;
1719   // Could also just open the file in this function (before acquiring the
1720   // mutex) and pass a gzFile.  Advantages: nothing bad happens if new_fname
1721   // is overwritten before it's read, RLstreamErrPrint() no longer has to deal
1722   // with OpenFail error.  Disadvantage: peak resource usage is a bit higher if
1723   // we open the second file before closing the first one.  Advantages probably
1724   // outweigh disadvantages, but I'll wait till --pmerge development to make a
1725   // decision since that's the main function that actually cares.
1726   syncp->new_fname = new_fname;
1727   SetEvent(syncp->consumer_progress_event);
1728   LeaveCriticalSection(critical_sectionp);
1729 #else
1730   pthread_mutex_t* sync_mutexp = &syncp->sync_mutex;
1731   pthread_cond_t* consumer_progress_condvarp = &syncp->consumer_progress_condvar;
1732   pthread_mutex_lock(sync_mutexp);
1733   const PglErr reterr = syncp->reterr;
1734   if (reterr != kPglRetSuccess) {
1735     if (unlikely(reterr != kPglRetEof)) {
1736       basep->errmsg = syncp->errmsg;
1737       pthread_mutex_unlock(sync_mutexp);
1738       basep->reterr = reterr;
1739       return reterr;
1740     }
1741     // clear eof
1742     syncp->reterr = kPglRetSuccess;
1743   }
1744   // bugfix (4 Oct 2019): also need to clear eof here
1745   basep->reterr = kPglRetSuccess;
1746   char* buf = basep->dst;
1747   syncp->consume_tail = buf;
1748   syncp->cur_circular_end = nullptr;
1749   syncp->available_end = buf;
1750   syncp->dst_reallocated = 0;
1751   syncp->interrupt = kTxsInterruptRetarget;
1752   syncp->new_fname = new_fname;
1753   syncp->consumer_progress_state = 1;
1754   pthread_cond_signal(consumer_progress_condvarp);
1755   pthread_mutex_unlock(sync_mutexp);
1756 #endif
1757   basep->consume_iter = buf;
1758   basep->consume_stop = buf;
1759   return kPglRetSuccess;
1760 }
1761 
CleanupTextStream(TextStream * txs_ptr,PglErr * reterrp)1762 BoolErr CleanupTextStream(TextStream* txs_ptr, PglErr* reterrp) {
1763   TextStreamMain* txsp = GetTxsp(txs_ptr);
1764   TextFileBase* basep = &txsp->base;
1765   TextStreamSync* syncp = txsp->syncp;
1766   if (syncp) {
1767 #ifdef _WIN32
1768     if (syncp->read_thread) {
1769       CRITICAL_SECTION* critical_sectionp = &syncp->critical_section;
1770       EnterCriticalSection(critical_sectionp);
1771       syncp->interrupt = kTxsInterruptShutdown;
1772       SetEvent(syncp->consumer_progress_event);
1773       LeaveCriticalSection(critical_sectionp);
1774       WaitForSingleObject(syncp->read_thread, INFINITE);
1775       DeleteCriticalSection(critical_sectionp);
1776       CloseHandle(syncp->consumer_progress_event);
1777       CloseHandle(syncp->reader_progress_event);
1778     }
1779 #else
1780     const uint32_t sync_init_state = syncp->sync_init_state;
1781     if (sync_init_state) {
1782       pthread_mutex_t* sync_mutexp = &syncp->sync_mutex;
1783       pthread_cond_t* consumer_progress_condvarp = &syncp->consumer_progress_condvar;
1784       if (sync_init_state == 4) {
1785         pthread_mutex_lock(sync_mutexp);
1786         syncp->interrupt = kTxsInterruptShutdown;
1787         syncp->consumer_progress_state = 1;
1788         pthread_cond_signal(consumer_progress_condvarp);
1789         pthread_mutex_unlock(sync_mutexp);
1790         pthread_join(syncp->read_thread, nullptr);
1791       }
1792       pthread_mutex_destroy(sync_mutexp);
1793       if (sync_init_state > 1) {
1794         pthread_cond_destroy(&syncp->reader_progress_condvar);
1795         if (sync_init_state > 2) {
1796           pthread_cond_destroy(consumer_progress_condvarp);
1797         }
1798       }
1799     }
1800 #endif
1801     aligned_free(txsp->syncp);
1802     txsp->syncp = nullptr;
1803   }
1804   basep->consume_iter = nullptr;
1805   basep->consume_stop = nullptr;
1806   basep->reterr = kPglRetEof;
1807   basep->errmsg = nullptr;
1808   if (basep->dst && (!basep->dst_owned_by_consumer)) {
1809     free(basep->dst);
1810     basep->dst = nullptr;
1811   }
1812   if (basep->ff) {
1813     if (basep->file_type != kFileUncompressed) {
1814       if (basep->file_type == kFileZstd) {
1815         if (txsp->rds.zst.ib.src) {
1816           free_const(txsp->rds.zst.ib.src);
1817           txsp->rds.zst.ib.src = nullptr;
1818         }
1819         if (txsp->rds.zst.ds) {
1820           ZSTD_freeDStream(txsp->rds.zst.ds);
1821           txsp->rds.zst.ds = nullptr;
1822         }
1823       } else if (basep->file_type == kFileBgzf) {
1824         CleanupBgzfRawMtStream(&txsp->rds.bgzf);
1825       } else {
1826         // plain gzip
1827         if (txsp->rds.gz.in) {
1828           free(txsp->rds.gz.in);
1829           txsp->rds.gz.in = nullptr;
1830         }
1831         if (txsp->rds.gz.ds_initialized) {
1832           inflateEnd(&txsp->rds.gz.ds);
1833         }
1834       }
1835       basep->file_type = kFileUncompressed;
1836     }
1837     if (unlikely(fclose_null(&basep->ff))) {
1838       if (!reterrp) {
1839         return 1;
1840       }
1841       if (*reterrp == kPglRetSuccess) {
1842         // Note that we don't set basep->reterr or ->errmsg here.
1843         *reterrp = kPglRetReadFail;
1844         return 1;
1845       }
1846     }
1847   }
1848   return 0;
1849 }
1850 
1851 
TksNext(TokenStream * tksp,uint32_t shard_ct,char ** shard_boundaries)1852 PglErr TksNext(TokenStream* tksp, uint32_t shard_ct, char** shard_boundaries) {
1853   TextStreamMain* txsp = GetTxsp(&tksp->txs);
1854   txsp->base.consume_iter = txsp->base.consume_stop;
1855   PglErr reterr = TextAdvance(&(tksp->txs));
1856   if (reterr) { // not unlikely due to eof
1857     return reterr;
1858   }
1859   char* consume_iter = txsp->base.consume_iter;
1860   char* consume_stop = txsp->base.consume_stop;
1861   shard_boundaries[0] = consume_iter;
1862   shard_boundaries[shard_ct] = consume_stop;
1863   if (shard_ct > 1) {
1864     const uintptr_t shard_size_target = S_CAST(uintptr_t, consume_stop - consume_iter) / shard_ct;
1865     char* boundary_min = consume_iter;
1866     char* cur_boundary = consume_iter;
1867     for (uint32_t boundary_idx = 1; boundary_idx < shard_ct; ++boundary_idx) {
1868       boundary_min = &(boundary_min[shard_size_target]);
1869       if (boundary_min > cur_boundary) {
1870         // last character must be token separator
1871         cur_boundary = FirstSpaceOrEoln(boundary_min);
1872         ++cur_boundary;
1873       }
1874       shard_boundaries[boundary_idx] = cur_boundary;
1875     }
1876   }
1877   return kPglRetSuccess;
1878 }
1879 
1880 #ifdef __cplusplus
1881 }
1882 #endif
1883