1 // This library is part of PLINK 2.00, copyright (C) 2005-2020 Shaun Purcell,
2 // Christopher Chang.
3 //
4 // This library is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by the
6 // Free Software Foundation, either version 3 of the License, or (at your
7 // option) any later version.
8 //
9 // This library is distributed in the hope that it will be useful, but WITHOUT
10 // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
12 // for more details.
13 //
14 // You should have received a copy of the GNU Lesser General Public License
15 // along with this library. If not, see <http://www.gnu.org/licenses/>.
16
17 #include <errno.h>
18 #include "plink2_text.h"
19
20 #ifdef __cplusplus
21 namespace plink2 {
22 #endif
23
GetTxfp(textFILE * txf_ptr)24 static inline textFILEMain* GetTxfp(textFILE* txf_ptr) {
25 return &GET_PRIVATE(*txf_ptr, m);
26 }
27
GetTxsp(TextStream * txs_ptr)28 static inline TextStreamMain* GetTxsp(TextStream* txs_ptr) {
29 return &GET_PRIVATE(*txs_ptr, m);
30 }
31
GetTxspK(const TextStream * txs_ptr)32 static inline const TextStreamMain* GetTxspK(const TextStream* txs_ptr) {
33 return &GET_PRIVATE(*txs_ptr, m);
34 }
35
GetFileType(const char * fname,FileCompressionType * ftype_ptr)36 PglErr GetFileType(const char* fname, FileCompressionType* ftype_ptr) {
37 FILE* infile = fopen(fname, FOPEN_RB);
38 if (unlikely(!infile)) {
39 // Note that this does not print an error message (since it may be called
40 // by a worker thread).
41 return kPglRetOpenFail;
42 }
43 unsigned char buf[16];
44 const uint32_t nbytes = fread_unlocked(buf, 1, 16, infile);
45 if (unlikely(ferror_unlocked(infile) || fclose(infile))) {
46 return kPglRetReadFail;
47 }
48 if (nbytes < 4) {
49 *ftype_ptr = kFileUncompressed;
50 return kPglRetSuccess;
51 }
52 uint32_t magic4;
53 memcpy(&magic4, buf, 4);
54
55 if (IsZstdFrame(magic4)) {
56 *ftype_ptr = kFileZstd;
57 return kPglRetSuccess;
58 }
59 if (S_CAST(uint16_t, magic4) != 0x8b1f) { // gzip ID1/ID2 bytes
60 *ftype_ptr = kFileUncompressed;
61 return kPglRetSuccess;
62 }
63 if ((nbytes == 16) && IsBgzfHeader(buf)) {
64 *ftype_ptr = kFileBgzf;
65 } else {
66 *ftype_ptr = kFileGzip;
67 }
68 return kPglRetSuccess;
69 }
70
EraseTextFileBase(TextFileBase * trbp)71 void EraseTextFileBase(TextFileBase* trbp) {
72 trbp->consume_iter = nullptr;
73 trbp->consume_stop = nullptr;
74 trbp->errmsg = nullptr;
75 trbp->reterr = kPglRetEof;
76 trbp->ff = nullptr;
77 trbp->dst = nullptr;
78 }
79
PreinitTextFile(textFILE * txf_ptr)80 void PreinitTextFile(textFILE* txf_ptr) {
81 EraseTextFileBase(&GetTxfp(txf_ptr)->base);
82 }
83
GzRawInit(const void * buf,uint32_t nbytes,GzRawDecompressStream * gzp)84 BoolErr GzRawInit(const void* buf, uint32_t nbytes, GzRawDecompressStream* gzp) {
85 gzp->ds_initialized = 0;
86 gzp->in = S_CAST(unsigned char*, malloc(kDecompressChunkSize));
87 if (!gzp->in) {
88 return 1;
89 }
90 z_stream* dsp = &gzp->ds;
91 memcpy(gzp->in, buf, nbytes);
92 dsp->next_in = gzp->in;
93 dsp->avail_in = nbytes;
94 dsp->zalloc = nullptr;
95 dsp->zfree = nullptr;
96 dsp->opaque = nullptr;
97 if (unlikely(inflateInit2(dsp, MAX_WBITS | 16) != Z_OK)) {
98 return 1;
99 }
100 gzp->ds_initialized = 1;
101 return 0;
102 }
103
ZstRawInit(const void * buf,uint32_t nbytes,ZstRawDecompressStream * zstp)104 BoolErr ZstRawInit(const void* buf, uint32_t nbytes, ZstRawDecompressStream* zstp) {
105 zstp->ib.src = malloc(kDecompressChunkSize);
106 if (unlikely(!zstp->ib.src)) {
107 zstp->ds = nullptr;
108 return 1;
109 }
110 zstp->ds = ZSTD_createDStream();
111 if (unlikely(!zstp->ds)) {
112 return 1;
113 }
114 memcpy(K_CAST(void*, zstp->ib.src), buf, nbytes);
115 zstp->ib.size = nbytes;
116 zstp->ib.pos = 0;
117 return 0;
118 }
119
120 const char kShortErrRfileAlreadyOpen[] = "TextFileOpenInternal can't be called on an already-open file";
121 const char kShortErrRfileEnforcedMaxBlenTooSmall[] = "TextFileOpenInternal: enforced_max_line_blen too small (must be at least max(1 MiB, dst_capacity - 1 MiB))";
122 const char kShortErrRfileDstCapacityTooSmall[] = "TextFileOpenInternal: dst_capacity too small (2 MiB minimum)";
123
TextFileOpenInternal(const char * fname,uint32_t enforced_max_line_blen,uint32_t dst_capacity,char * dst,textFILEMain * txfp,TextStreamMain * txsp)124 PglErr TextFileOpenInternal(const char* fname, uint32_t enforced_max_line_blen, uint32_t dst_capacity, char* dst, textFILEMain* txfp, TextStreamMain* txsp) {
125 PglErr reterr = kPglRetSuccess;
126 TextFileBase* trbp;
127 if (txfp) {
128 trbp = &txfp->base;
129 } else {
130 trbp = &txsp->base;
131 }
132 {
133 // 1. Open file, get type.
134 if (unlikely(trbp->ff)) {
135 reterr = kPglRetImproperFunctionCall;
136 trbp->errmsg = kShortErrRfileAlreadyOpen;
137 goto TextFileOpenInternal_ret_1;
138 }
139 if (enforced_max_line_blen || txfp) {
140 if (unlikely(enforced_max_line_blen < kDecompressMinBlen)) {
141 reterr = kPglRetImproperFunctionCall;
142 trbp->errmsg = kShortErrRfileEnforcedMaxBlenTooSmall;
143 goto TextFileOpenInternal_ret_1;
144 }
145 if (dst) {
146 if (unlikely(dst_capacity < kDecompressMinCapacity)) {
147 reterr = kPglRetImproperFunctionCall;
148 trbp->errmsg = kShortErrRfileDstCapacityTooSmall;
149 goto TextFileOpenInternal_ret_1;
150 }
151 if (unlikely(enforced_max_line_blen + kDecompressChunkSize < dst_capacity)) {
152 reterr = kPglRetImproperFunctionCall;
153 trbp->errmsg = kShortErrRfileEnforcedMaxBlenTooSmall;
154 goto TextFileOpenInternal_ret_1;
155 }
156 }
157 } else {
158 // token-reading mode. dst == nullptr not currently supported.
159 assert(dst && (dst_capacity == kTokenStreamBlen));
160 }
161 trbp->ff = fopen(fname, FOPEN_RB);
162 if (unlikely(!trbp->ff)) {
163 goto TextFileOpenInternal_ret_OPEN_FAIL;
164 }
165 trbp->file_type = kFileUncompressed;
166 if (dst) {
167 trbp->dst_owned_by_consumer = 1;
168 trbp->dst_capacity = dst_capacity;
169 } else {
170 dst = S_CAST(char*, malloc(kDecompressMinCapacity));
171 if (unlikely(dst == nullptr)) {
172 goto TextFileOpenInternal_ret_NOMEM;
173 }
174 trbp->dst_owned_by_consumer = 0;
175 trbp->dst_capacity = kDecompressMinCapacity;
176 }
177 trbp->dst = dst;
178 uint32_t nbytes = fread_unlocked(dst, 1, 16, trbp->ff);
179 trbp->dst_len = nbytes;
180 trbp->enforced_max_line_blen = enforced_max_line_blen;
181 trbp->consume_iter = dst;
182 trbp->consume_stop = dst;
183 if (nbytes >= 4) {
184 const uint32_t magic4 = *R_CAST(uint32_t*, dst);
185 if (IsZstdFrame(magic4)) {
186 trbp->dst_len = 0;
187 trbp->file_type = kFileZstd;
188 ZstRawDecompressStream* zstp;
189 if (txfp) {
190 zstp = &txfp->rds.zst;
191 } else {
192 zstp = &txsp->rds.zst;
193 }
194 if (unlikely(ZstRawInit(dst, nbytes, zstp))) {
195 goto TextFileOpenInternal_ret_NOMEM;
196 }
197 } else if ((magic4 << 8) == 0x088b1f00) {
198 // gzip ID1/ID2 bytes, deflate compression method
199 trbp->dst_len = 0;
200 if ((nbytes == 16) && IsBgzfHeader(dst)) {
201 trbp->file_type = kFileBgzf;
202 if (txfp) {
203 BgzfRawDecompressStream* bgzfp = &txfp->rds.bgzf;
204 bgzfp->in = S_CAST(unsigned char*, malloc(kDecompressChunkSize));
205 if (unlikely(!bgzfp->in)) {
206 bgzfp->ldc = nullptr;
207 goto TextFileOpenInternal_ret_NOMEM;
208 }
209 bgzfp->ldc = libdeflate_alloc_decompressor();
210 if (!bgzfp->ldc) {
211 goto TextFileOpenInternal_ret_NOMEM;
212 }
213 memcpy(bgzfp->in, dst, nbytes);
214 bgzfp->in_size = nbytes;
215 bgzfp->in_pos = 0;
216 } else {
217 reterr = BgzfRawMtStreamInit(dst, txsp->decompress_thread_ct, trbp->ff, nullptr, &txsp->rds.bgzf, &trbp->errmsg);
218 if (unlikely(reterr)) {
219 goto TextFileOpenInternal_ret_1;
220 }
221 }
222 } else {
223 trbp->file_type = kFileGzip;
224 GzRawDecompressStream* gzp;
225 if (txfp) {
226 gzp = &txfp->rds.gz;
227 } else {
228 gzp = &txsp->rds.gz;
229 }
230 if (unlikely(GzRawInit(dst, nbytes, gzp))) {
231 goto TextFileOpenInternal_ret_NOMEM;
232 }
233 }
234 }
235 } else if (!nbytes) {
236 if (unlikely(!feof_unlocked(trbp->ff))) {
237 goto TextFileOpenInternal_ret_READ_FAIL;
238 }
239 // May as well accept this.
240 // Don't jump to ret_1 since we're setting txfp->reterr to a different
241 // value than we're returning.
242 trbp->reterr = kPglRetEof;
243 return kPglRetSuccess;
244 }
245 }
246 while (0) {
247 TextFileOpenInternal_ret_NOMEM:
248 reterr = kPglRetNomem;
249 break;
250 TextFileOpenInternal_ret_OPEN_FAIL:
251 reterr = kPglRetOpenFail;
252 trbp->errmsg = strerror(errno);
253 break;
254 TextFileOpenInternal_ret_READ_FAIL:
255 reterr = kPglRetReadFail;
256 trbp->errmsg = strerror(errno);
257 break;
258 }
259 TextFileOpenInternal_ret_1:
260 trbp->reterr = reterr;
261 return reterr;
262 }
263
TextFileOpenEx(const char * fname,uint32_t enforced_max_line_blen,uint32_t dst_capacity,char * dst,textFILE * txf_ptr)264 PglErr TextFileOpenEx(const char* fname, uint32_t enforced_max_line_blen, uint32_t dst_capacity, char* dst, textFILE* txf_ptr) {
265 return TextFileOpenInternal(fname, enforced_max_line_blen, dst_capacity, dst, GetTxfp(txf_ptr), nullptr);
266 }
267
268 // Set enforced_max_line_blen == 0 in the token-reading case.
IsPathologicallyLongLineOrToken(const char * line_start,const char * load_start,const char * known_line_end,uint32_t enforced_max_line_blen)269 BoolErr IsPathologicallyLongLineOrToken(const char* line_start, const char* load_start, const char* known_line_end, uint32_t enforced_max_line_blen) {
270 if (enforced_max_line_blen) {
271 // Preconditions:
272 // * No \n in [line_start, load_start).
273 // * (known_line_end - load_start) is usually <= enforced_max_line_blen,
274 // and never much larger. Not a hard requirement, but it's better to
275 // enforce the line-length limit during line iteration outside this
276 // regime to avoid duplicating work.
277 if (S_CAST(uintptr_t, known_line_end - line_start) <= enforced_max_line_blen) {
278 return 0;
279 }
280 const uint32_t already_scanned_byte_ct = load_start - line_start;
281 if (unlikely(already_scanned_byte_ct >= enforced_max_line_blen)) {
282 return 1;
283 }
284 const char* memchr_result = S_CAST(const char*, memchr(load_start, '\n', enforced_max_line_blen - already_scanned_byte_ct));
285 if (unlikely(!memchr_result)) {
286 return 1;
287 }
288 // If we've found a line with terminal \n at or after this address, there
289 // are <= enforced_max_line_blen bytes left, so no remaining line can be
290 // longer.
291 const char* memchr_result_thresh = known_line_end - (enforced_max_line_blen + 1);
292 while (1) {
293 if (memchr_result >= memchr_result_thresh) {
294 return 0;
295 }
296 memchr_result = S_CAST(const char*, memchr(&(memchr_result[1]), '\n', enforced_max_line_blen));
297 if (unlikely(!memchr_result)) {
298 return 1;
299 }
300 }
301 }
302 if (S_CAST(uintptr_t, known_line_end - line_start) <= kMaxTokenBlen) {
303 return 0;
304 }
305 const uint32_t already_scanned_byte_ct = load_start - line_start;
306 if (unlikely(already_scanned_byte_ct >= kMaxTokenBlen)) {
307 return 1;
308 }
309 // No loop needed for now, since token-scanning buffer sizes are hardcoded.
310 //
311 // Replace with a forward-scanning version of this functionality when
312 // available ("FirstPostspaceBoundedFar"?)
313 return (LastSpaceOrEoln(load_start, kMaxTokenBlen - already_scanned_byte_ct) == nullptr);
314 }
315
316 const char kShortErrRfileTruncatedGz[] = "GzRawStreamRead: gzipped file appears to be truncated";
317
GzRawStreamRead(char * dst_end,FILE * ff,GzRawDecompressStream * gzp,char ** dst_iterp,const char ** errmsgp)318 PglErr GzRawStreamRead(char* dst_end, FILE* ff, GzRawDecompressStream* gzp, char** dst_iterp, const char** errmsgp) {
319 z_stream* dsp = &gzp->ds;
320 if ((!dsp->avail_in) && feof_unlocked(ff)) {
321 return kPglRetSuccess;
322 }
323 char* dst_iter = *dst_iterp;
324 do {
325 int zerr = Z_OK;
326 if (dsp->avail_in) { // can be zero after TextRewind()
327 dsp->next_out = R_CAST(unsigned char*, dst_iter);
328 dsp->avail_out = dst_end - dst_iter;
329 zerr = inflate(dsp, Z_SYNC_FLUSH);
330 if (unlikely((zerr < 0) || (zerr == Z_NEED_DICT))) {
331 if (dsp->msg) {
332 *errmsgp = dsp->msg;
333 } else {
334 *errmsgp = zError(zerr);
335 }
336 return kPglRetDecompressFail;
337 }
338 dst_iter = R_CAST(char*, dsp->next_out);
339 if (dsp->avail_in) {
340 assert(dst_iter == dst_end);
341 break;
342 }
343 }
344 const uint32_t nbytes = fread_unlocked(gzp->in, 1, kDecompressChunkSize, ff);
345 dsp->next_in = gzp->in;
346 dsp->avail_in = nbytes;
347 if (!nbytes) {
348 if (unlikely(!feof_unlocked(ff))) {
349 *errmsgp = strerror(errno);
350 return kPglRetReadFail;
351 }
352 if (unlikely(zerr == Z_OK)) {
353 *errmsgp = kShortErrRfileTruncatedGz;
354 return kPglRetDecompressFail;
355 }
356 // Normal EOF.
357 break;
358 }
359 } while (dst_iter != dst_end);
360 *dst_iterp = dst_iter;
361 return kPglRetSuccess;
362 }
363
ZstRawStreamRead(char * dst_end,FILE * ff,ZstRawDecompressStream * zstp,char ** dst_iterp,const char ** errmsgp)364 PglErr ZstRawStreamRead(char* dst_end, FILE* ff, ZstRawDecompressStream* zstp, char** dst_iterp, const char** errmsgp) {
365 if ((!zstp->ib.size) && feof_unlocked(ff)) {
366 return kPglRetSuccess;
367 }
368 // Sequentially dependent blocks limited to ~128 KiB.
369 char* dst_iter = *dst_iterp;
370 while (1) {
371 ZSTD_outBuffer zob = {R_CAST(unsigned char*, dst_iter), S_CAST(size_t, dst_end - dst_iter), 0};
372 // ib.size == 0 ok, no need to special-case rewind.
373 const uintptr_t read_size_hint = ZSTD_decompressStream(zstp->ds, &zob, &zstp->ib);
374 if (unlikely(ZSTD_isError(read_size_hint))) {
375 *errmsgp = ZSTD_getErrorName(read_size_hint);
376 return kPglRetDecompressFail;
377 }
378 dst_iter = &(dst_iter[zob.pos]);
379 if (dst_iter == dst_end) {
380 break;
381 }
382 // Decoder has flushed everything it could. Either we're at EOF, or we
383 // must load more.
384 unsigned char* in = S_CAST(unsigned char*, K_CAST(void*, zstp->ib.src));
385 const uint32_t n_inbytes = zstp->ib.size - zstp->ib.pos;
386 memmove(in, &(in[zstp->ib.pos]), n_inbytes);
387 unsigned char* load_start = &(in[n_inbytes]);
388 const uint32_t nbytes = fread_unlocked(load_start, 1, kDecompressChunkSize - n_inbytes, ff);
389 if (unlikely(ferror_unlocked(ff))) {
390 *errmsgp = strerror(errno);
391 return kPglRetReadFail;
392 }
393 zstp->ib.pos = 0;
394 zstp->ib.size = nbytes + n_inbytes;
395 if (!nbytes) {
396 if (unlikely(n_inbytes)) {
397 *errmsgp = kShortErrZstdPrefixUnknown;
398 return kPglRetDecompressFail;
399 }
400 break;
401 }
402 }
403 *dst_iterp = dst_iter;
404 return kPglRetSuccess;
405 }
406
407 const char kShortErrLongLine[] = "Pathologically long line";
408 const char kShortErrInteriorEmptyLine[] = "Unexpected interior empty line";
409
TextFileAdvance(textFILE * txf_ptr)410 PglErr TextFileAdvance(textFILE* txf_ptr) {
411 textFILEMain* txfp = GetTxfp(txf_ptr);
412 TextFileBase* basep = &txfp->base;
413 if (basep->reterr) {
414 return basep->reterr;
415 }
416 PglErr reterr = kPglRetSuccess;
417 {
418 char* line_start = basep->consume_stop;
419 assert(basep->consume_iter == line_start);
420 char* dst = basep->dst;
421 char* dst_load_start;
422 while (1) {
423 const uint32_t dst_offset = line_start - dst;
424 const uint32_t dst_rem = basep->dst_len - dst_offset;
425 // (dst_rem guaranteed to be < basep->enforced_max_line_blen here, since
426 // otherwise we error out earlier.)
427 // Two cases:
428 // 1. Move (possibly empty) unfinished line to the beginning of the
429 // buffer.
430 // 2. Resize the buffer/report out-of-memory.
431 if (dst_rem < basep->dst_capacity - kDecompressChunkSize) {
432 if (dst_offset) {
433 memmove(dst, line_start, dst_rem);
434 }
435 } else {
436 if (unlikely(basep->dst_owned_by_consumer)) {
437 goto TextFileAdvance_ret_NOMEM;
438 }
439 uint32_t next_dst_capacity = basep->enforced_max_line_blen + kDecompressChunkSize;
440 if ((next_dst_capacity / 2) > basep->dst_capacity) {
441 next_dst_capacity = basep->dst_capacity * 2;
442 }
443 #ifndef __LP64__
444 if (next_dst_capacity >= 0x80000000U) {
445 goto TextFileAdvance_ret_NOMEM;
446 }
447 #endif
448 char* dst_next;
449 if (!dst_offset) {
450 dst_next = S_CAST(char*, realloc(dst, next_dst_capacity));
451 if (unlikely(!dst_next)) {
452 goto TextFileAdvance_ret_NOMEM;
453 }
454 } else {
455 dst_next = S_CAST(char*, malloc(next_dst_capacity));
456 if (unlikely(!dst_next)) {
457 goto TextFileAdvance_ret_NOMEM;
458 }
459 memcpy(dst_next, line_start, dst_rem);
460 }
461 basep->dst = dst_next;
462 dst = dst_next;
463 }
464 line_start = dst;
465 dst_load_start = &(dst[dst_rem]);
466 FILE* ff = basep->ff;
467 char* dst_iter = dst_load_start;
468 // We don't want to always fill the entire buffer here. The main plink2
469 // use case of textFILE is to just peek at an unknown-length header line
470 // with a maximal-length line-load buffer, compute a
471 // legitimate-line-length bound, and then (move-)construct a TextStream
472 // with the shorter buffer size.
473 // Instead, we load up to the smallest power of 2 >= (dst_rem + 1 MiB).
474 uintptr_t stop_offset = (2 * k1LU) << bsru32(dst_rem + kDecompressChunkSize - 1);
475 if (stop_offset > basep->dst_capacity) {
476 stop_offset = basep->dst_capacity;
477 }
478 char* dst_stop = &(dst[stop_offset]);
479 basep->consume_iter = dst;
480 switch (basep->file_type) {
481 case kFileUncompressed:
482 {
483 uint32_t rlen = dst_stop - dst_iter;
484 if (rlen > kMaxBytesPerIO) {
485 // We need to know how many bytes were read, so fread_checked()
486 // doesn't work.
487 // This is an if-statement instead of a while loop since rlen can
488 // never be larger than 2 * kMaxBytesPerIO.
489 const uint32_t nbytes = fread_unlocked(dst_iter, 1, kMaxBytesPerIO, ff);
490 if (nbytes < kMaxBytesPerIO) {
491 if (unlikely(ferror_unlocked(ff))) {
492 goto TextFileAdvance_ret_READ_FAIL;
493 }
494 basep->dst_len = nbytes + dst_rem;
495 break;
496 }
497 rlen -= kMaxBytesPerIO;
498 dst_iter = &(dst_iter[kMaxBytesPerIO]);
499 }
500 const uint32_t nbytes = fread_unlocked(dst_iter, 1, rlen, ff);
501 if (unlikely(ferror_unlocked(ff))) {
502 goto TextFileAdvance_ret_READ_FAIL;
503 }
504 dst_iter = &(dst_iter[nbytes]);
505 break;
506 }
507 case kFileGzip:
508 {
509 reterr = GzRawStreamRead(dst_stop, ff, &txfp->rds.gz, &dst_iter, &basep->errmsg);
510 if (unlikely(reterr)) {
511 goto TextFileAdvance_ret_1;
512 }
513 break;
514 }
515 case kFileBgzf:
516 {
517 // Fully independent blocks limited to 64 KiB.
518 // probable todo: move this to a BgzfRawStreamRead() function in
519 // plink2_bgzf (and move ZstRawStreamRead() to plink2_zstfile).
520 BgzfRawDecompressStream* bgzfp = &txfp->rds.bgzf;
521 if ((!bgzfp->in_size) && feof_unlocked(ff)) {
522 break;
523 }
524 struct libdeflate_decompressor* ldc = bgzfp->ldc;
525 unsigned char* in = bgzfp->in;
526 unsigned char* in_iter = &(in[bgzfp->in_pos]);
527 unsigned char* in_end = &(in[bgzfp->in_size]);
528 while (1) {
529 uint32_t n_inbytes = in_end - in_iter;
530 if (n_inbytes > 25) {
531 if (unlikely(!IsBgzfHeader(in_iter))) {
532 goto TextFileAdvance_ret_INVALID_BGZF;
533 }
534 # ifdef __arm__
535 # error "Unaligned accesses in TextFileAdvance()."
536 # endif
537 const uint32_t bsize_minus1 = *R_CAST(uint16_t*, &(in_iter[16]));
538 if (unlikely(bsize_minus1 < 25)) {
539 goto TextFileAdvance_ret_INVALID_BGZF;
540 }
541 if (bsize_minus1 < n_inbytes) {
542 // We have at least one fully-loaded compressed block.
543 // Decompress it if we have enough space.
544 const uint32_t in_size = bsize_minus1 - 25;
545 const uint32_t out_size = *R_CAST(uint32_t*, &(in_iter[in_size + 22]));
546 if (unlikely(out_size > 65536)) {
547 goto TextFileAdvance_ret_INVALID_BGZF;
548 }
549 if (out_size > S_CAST(uintptr_t, dst_stop - dst_iter)) {
550 break;
551 }
552 if (unlikely(libdeflate_deflate_decompress(ldc, &(in_iter[18]), in_size, dst_iter, out_size, nullptr))) {
553 goto TextFileAdvance_ret_INVALID_BGZF;
554 }
555 in_iter = &(in_iter[bsize_minus1 + 1]);
556 dst_iter = &(dst_iter[out_size]);
557 continue;
558 }
559 }
560 // Either we're at EOF, or we must load more.
561 memmove(in, in_iter, n_inbytes);
562 unsigned char* load_start = &(in[n_inbytes]);
563 const uint32_t nbytes = fread_unlocked(load_start, 1, kDecompressChunkSize - n_inbytes, ff);
564 if (unlikely(ferror_unlocked(ff))) {
565 goto TextFileAdvance_ret_READ_FAIL;
566 }
567 in_iter = in;
568 in_end = &(load_start[nbytes]);
569 bgzfp->in_size = in_end - in;
570 if (!nbytes) {
571 if (unlikely(n_inbytes)) {
572 goto TextFileAdvance_ret_INVALID_BGZF;
573 }
574 break;
575 }
576 }
577 bgzfp->in_pos = in_iter - in;
578 dst_stop = dst_iter;
579 break;
580 }
581 case kFileZstd:
582 {
583 reterr = ZstRawStreamRead(dst_stop, ff, &txfp->rds.zst, &dst_iter, &basep->errmsg);
584 if (unlikely(reterr)) {
585 goto TextFileAdvance_ret_1;
586 }
587 break;
588 }
589 }
590 basep->dst_len = dst_iter - dst;
591 if (!basep->dst_len) {
592 goto TextFileAdvance_ret_EOF;
593 }
594 if (dst_iter != dst_stop) {
595 // If last character of file isn't a newline, append one to simplify
596 // downstream code.
597 if (dst_iter[-1] != '\n') {
598 *dst_iter++ = '\n';
599 basep->dst_len += 1;
600 }
601 basep->consume_stop = dst_iter;
602 break;
603 }
604 char* last_byte_ptr = Memrchr(dst_load_start, '\n', dst_iter - dst_load_start);
605 if (last_byte_ptr) {
606 basep->consume_stop = &(last_byte_ptr[1]);
607 break;
608 }
609 // Buffer is full, and no '\n' is present. Restart the loop and try to
610 // load more data (extending the buffer if necessary), if we aren't
611 // already at/past the line-length limit.
612 if (basep->dst_len >= basep->enforced_max_line_blen) {
613 goto TextFileAdvance_ret_LONG_LINE;
614 }
615 }
616 if (unlikely(IsPathologicallyLongLineOrToken(dst, dst_load_start, basep->consume_stop, basep->enforced_max_line_blen))) {
617 goto TextFileAdvance_ret_LONG_LINE;
618 }
619 }
620 while (0) {
621 TextFileAdvance_ret_NOMEM:
622 reterr = kPglRetNomem;
623 break;
624 TextFileAdvance_ret_READ_FAIL:
625 reterr = kPglRetReadFail;
626 basep->errmsg = strerror(errno);
627 break;
628 TextFileAdvance_ret_LONG_LINE:
629 basep->errmsg = kShortErrLongLine;
630 reterr = kPglRetMalformedInput;
631 break;
632 TextFileAdvance_ret_INVALID_BGZF:
633 basep->errmsg = kShortErrInvalidBgzf;
634 reterr = kPglRetDecompressFail;
635 break;
636 TextFileAdvance_ret_EOF:
637 reterr = kPglRetEof;
638 break;
639 }
640 TextFileAdvance_ret_1:
641 basep->reterr = reterr;
642 return reterr;
643 }
644
TextFileOnlyEmptyLinesLeft(textFILE * txf_ptr)645 PglErr TextFileOnlyEmptyLinesLeft(textFILE* txf_ptr) {
646 TextFileBase* basep = &GetTxfp(txf_ptr)->base;
647 char* line_start = basep->consume_iter;
648 while (1) {
649 if (line_start == basep->consume_stop) {
650 basep->consume_iter = line_start;
651 PglErr reterr = TextFileAdvance(txf_ptr);
652 if (reterr) {
653 return reterr;
654 }
655 line_start = basep->consume_iter;
656 }
657 line_start = FirstNonTspace(line_start);
658 if (unlikely(!IsEolnKns(*line_start))) {
659 basep->reterr = kPglRetMalformedInput;
660 basep->errmsg = kShortErrInteriorEmptyLine;
661 return kPglRetMalformedInput;
662 }
663 line_start = AdvPastDelim(line_start, '\n');
664 }
665 }
666
TextFileRewind(textFILE * txf_ptr)667 void TextFileRewind(textFILE* txf_ptr) {
668 textFILEMain* txfp = GetTxfp(txf_ptr);
669 TextFileBase* basep = &txfp->base;
670 if ((!basep->ff) || ((basep->reterr) && (basep->reterr != kPglRetEof))) {
671 return;
672 }
673 rewind(basep->ff);
674 basep->reterr = kPglRetSuccess;
675 basep->dst_len = 0;
676 basep->consume_iter = basep->dst;
677 basep->consume_stop = basep->dst;
678 if (basep->file_type != kFileUncompressed) {
679 if (basep->file_type == kFileGzip) {
680 txfp->rds.gz.ds.avail_in = 0;
681 #ifdef NDEBUG
682 inflateReset(&txfp->rds.gz.ds);
683 #else
684 const int errcode = inflateReset(&txfp->rds.gz.ds);
685 assert(errcode == Z_OK);
686 #endif
687 } else if (basep->file_type == kFileBgzf) {
688 txfp->rds.bgzf.in_size = 0;
689 txfp->rds.bgzf.in_pos = 0;
690 } else {
691 // kFileZstd
692 txfp->rds.zst.ib.size = 0;
693 txfp->rds.zst.ib.pos = 0;
694 ZSTD_DCtx_reset(txfp->rds.zst.ds, ZSTD_reset_session_only);
695 }
696 }
697 }
698
CleanupTextFile(textFILE * txf_ptr,PglErr * reterrp)699 BoolErr CleanupTextFile(textFILE* txf_ptr, PglErr* reterrp) {
700 textFILEMain* txfp = GetTxfp(txf_ptr);
701 TextFileBase* basep = &txfp->base;
702 basep->consume_iter = nullptr;
703 basep->consume_stop = nullptr;
704 basep->reterr = kPglRetEof;
705 basep->errmsg = nullptr;
706 if (basep->dst && (!basep->dst_owned_by_consumer)) {
707 free(basep->dst);
708 basep->dst = nullptr;
709 }
710 if (basep->ff) {
711 if (basep->file_type != kFileUncompressed) {
712 if (basep->file_type == kFileZstd) {
713 if (txfp->rds.zst.ib.src) {
714 free_const(txfp->rds.zst.ib.src);
715 txfp->rds.zst.ib.src = nullptr;
716 }
717 if (txfp->rds.zst.ds) {
718 ZSTD_freeDStream(txfp->rds.zst.ds);
719 txfp->rds.zst.ds = nullptr;
720 }
721 } else if (basep->file_type == kFileBgzf) {
722 if (txfp->rds.bgzf.in) {
723 free(txfp->rds.bgzf.in);
724 txfp->rds.bgzf.in = nullptr;
725 }
726 if (txfp->rds.bgzf.ldc) {
727 libdeflate_free_decompressor(txfp->rds.bgzf.ldc);
728 txfp->rds.bgzf.ldc = nullptr;
729 }
730 } else {
731 // plain gzip
732 if (txfp->rds.gz.in) {
733 free(txfp->rds.gz.in);
734 txfp->rds.gz.in = nullptr;
735 }
736 if (txfp->rds.gz.ds_initialized) {
737 inflateEnd(&txfp->rds.gz.ds);
738 }
739 }
740 }
741 if (unlikely(fclose_null(&basep->ff))) {
742 if (!reterrp) {
743 return 1;
744 }
745 if (*reterrp == kPglRetSuccess) {
746 *reterrp = kPglRetReadFail;
747 return 1;
748 }
749 }
750 }
751 return 0;
752 }
753
754
PreinitTextStream(TextStream * txs_ptr)755 void PreinitTextStream(TextStream* txs_ptr) {
756 TextStreamMain* txsp = GetTxsp(txs_ptr);
757 EraseTextFileBase(&txsp->base);
758 txsp->syncp = nullptr;
759 }
760
761 // This type of code is especially bug-prone (ESR would call it a "defect
762 // attractor"). Goal is to get it right, and fast enough to be a major win
763 // over gzgets()... and then not worry about it again for years.
TextStreamThread(void * raw_arg)764 THREAD_FUNC_DECL TextStreamThread(void* raw_arg) {
765 TextStreamMain* context = S_CAST(TextStreamMain*, raw_arg);
766 TextFileBase* basep = &context->base;
767 TextStreamSync* syncp = context->syncp;
768 FileCompressionType file_type = basep->file_type;
769 RawMtDecompressStream* rdsp = &context->rds;
770 FILE* ff = basep->ff;
771 char* buf = basep->dst;
772 char* buf_end = &(buf[basep->dst_capacity]);
773 char* cur_block_start = basep->consume_stop;
774 char* read_head = &(buf[basep->dst_len]);
775
776 // We can either be reading/decompressing into memory past the bytes passed
777 // to the consumer, or we can be doing it before those bytes.
778 // In the first case, read_stop is buf_end, but it gets changed to the
779 // latest value of consume_tail when we return to the front of the buffer.
780 // In the second case, read_stop is the position of the first passed byte.
781 char* read_stop = buf_end;
782 #ifdef _WIN32
783 CRITICAL_SECTION* critical_sectionp = &syncp->critical_section;
784 HANDLE reader_progress_event = syncp->reader_progress_event;
785 HANDLE consumer_progress_event = syncp->consumer_progress_event;
786 #else
787 pthread_mutex_t* sync_mutexp = &syncp->sync_mutex;
788 pthread_cond_t* reader_progress_condvarp = &syncp->reader_progress_condvar;
789 pthread_cond_t* consumer_progress_condvarp = &syncp->consumer_progress_condvar;
790 #endif
791 const uint32_t enforced_max_line_blen = basep->enforced_max_line_blen;
792 const char* new_fname = nullptr;
793 const uint32_t is_token_stream = (enforced_max_line_blen == 0);
794 while (1) {
795 TxsInterrupt interrupt = kTxsInterruptNone;
796 PglErr reterr;
797 TxsInterrupt min_interrupt;
798 while (1) {
799 uintptr_t read_attempt_size = read_stop - read_head;
800 if (!read_attempt_size) {
801 const uint32_t memmove_required = (read_stop == buf_end);
802 if (unlikely((cur_block_start == buf) && memmove_required)) {
803 // May need to modify this predicate if we ever allow is_token_stream
804 // && !dst_owned_by_consumer.
805 const uint32_t prev_capacity = buf_end - buf;
806 if (basep->dst_owned_by_consumer || (prev_capacity >= enforced_max_line_blen)) {
807 goto TextStreamThread_LONG_LINE;
808 }
809 // Try to expand buffer.
810 uint32_t next_dst_capacity = enforced_max_line_blen + kDecompressChunkSize;
811 if ((next_dst_capacity / 2) > basep->dst_capacity) {
812 next_dst_capacity = basep->dst_capacity * 2;
813 }
814 #ifndef __LP64__
815 if (next_dst_capacity >= 0x80000000U) {
816 goto TextStreamThread_NOMEM;
817 }
818 #endif
819 char* dst_next = S_CAST(char*, realloc(buf, next_dst_capacity));
820 if (unlikely(!dst_next)) {
821 goto TextStreamThread_NOMEM;
822 }
823 #ifdef _WIN32
824 EnterCriticalSection(critical_sectionp);
825 #else
826 pthread_mutex_lock(sync_mutexp);
827 #endif
828 basep->dst = dst_next;
829 basep->dst_capacity = next_dst_capacity;
830 syncp->consume_tail = dst_next;
831 syncp->available_end = dst_next;
832 syncp->dst_reallocated = 1;
833 #ifdef _WIN32
834 LeaveCriticalSection(critical_sectionp);
835 #else
836 pthread_mutex_unlock(sync_mutexp);
837 #endif
838 buf = dst_next;
839 buf_end = &(buf[next_dst_capacity]);
840 cur_block_start = buf;
841 read_head = &(buf[prev_capacity]);
842 read_stop = &(buf[next_dst_capacity]);
843 continue;
844 }
845 // We cannot continue reading forward. Cases:
846 // 1. read_stop == buf_end, cur_block_start != buf. This means we're
847 // in the middle of reading/decompressing a long line, and want to
848 // wait for consume_tail == cur_block_start, so we can memmove all
849 // the bytes back and continue reading forward. (Tried
850 // relaxing this to
851 // consume_tail >= (buf_end - cur_block_start) + margin
852 // for various values of margin, but that didn't make a meaningful
853 // difference.)
854 // 2. read_stop == buf_end, cur_block_start == buf. We failed with a
855 // long-line error here.
856 // 3. read_stop < buf_end (usual case). This means the consumer may
857 // not be done handling some bytes-in-front we handed off earlier.
858 // We are waiting for consume_tail <= cur_block_start, which means
859 // all bytes in front have been consumed and we're free to continue
860 // reading forward.
861 char* latest_consume_tail;
862 #ifdef _WIN32
863 // bugfix (7 May 2018): when consumer thread is waiting with
864 // syncp->consume_tail == cur_block_start, read_stop is near but not at
865 // buf_end, and there's no '\n' in the subsequent read, we can reach
866 // here a second time without releasing the consumer, so we'd enter
867 // deadlock if we unconditionally wait on consumer_progress_event (and
868 // in the Linux/OS X case, we'd be waiting for a spurious wakeup to
869 // save us).
870 // However, if memmove_required isn't true, we have to wait first; see
871 // the 21 Mar bugfix.
872 if (!memmove_required) {
873 goto TextStreamThread_wait_first;
874 }
875 while (1) {
876 EnterCriticalSection(critical_sectionp);
877 interrupt = syncp->interrupt;
878 if (interrupt != kTxsInterruptNone) {
879 goto TextStreamThread_INTERRUPT;
880 }
881 latest_consume_tail = syncp->consume_tail;
882 if (memmove_required) {
883 if (latest_consume_tail == cur_block_start) {
884 syncp->consume_tail = buf;
885 syncp->available_end = buf;
886 break;
887 }
888 } else if (latest_consume_tail <= cur_block_start) {
889 break;
890 }
891 LeaveCriticalSection(critical_sectionp);
892 TextStreamThread_wait_first:
893 WaitForSingleObject(consumer_progress_event, INFINITE);
894 }
895 // bugfix (23 Mar 2018): didn't always leave the critical section
896 LeaveCriticalSection(critical_sectionp);
897 #else
898 pthread_mutex_lock(sync_mutexp);
899 if (!memmove_required) {
900 // Wait for all bytes in front of read_stop to be consumed.
901 goto TextStreamThread_wait_first;
902 }
903 while (1) {
904 interrupt = syncp->interrupt;
905 if (interrupt != kTxsInterruptNone) {
906 goto TextStreamThread_INTERRUPT;
907 }
908 latest_consume_tail = syncp->consume_tail;
909 if (memmove_required) {
910 if (latest_consume_tail == cur_block_start) {
911 // All bytes have been consumed; memmove is now safe.
912 // bugfix (2 Oct 2018): Previously, this just set
913 // syncp->cur_circular_end = cur_block_start, but that created
914 // TWO consume_iter == available_end == cur_circular_end cases,
915 // one of which was handled incorrectly.
916 syncp->consume_tail = buf;
917 syncp->available_end = buf;
918 break;
919 }
920 // There are bytes behind cur_block_start that haven't been
921 // consumed yet. This is possible on the first iteration through
922 // the loop, since consumer_progress_state may have been set for a
923 // reason we aren't interested in.
924
925 } else if (latest_consume_tail <= cur_block_start) {
926 // All bytes in front of read_stop have been consumed.
927 break;
928 }
929 TextStreamThread_wait_first:
930 while (!syncp->consumer_progress_state) {
931 pthread_cond_wait(consumer_progress_condvarp, sync_mutexp);
932 }
933 syncp->consumer_progress_state = 0;
934 }
935 pthread_mutex_unlock(sync_mutexp);
936 #endif
937 if (read_stop == buf_end) {
938 const uint32_t cur_memmove_len = buf_end - cur_block_start;
939 memmove(buf, cur_block_start, cur_memmove_len);
940 cur_block_start = buf;
941 read_head = &(buf[cur_memmove_len]);
942 } else {
943 read_stop = buf_end;
944 }
945 continue;
946 }
947 if (read_attempt_size > kDecompressChunkSize) {
948 read_attempt_size = kDecompressChunkSize;
949 }
950 char* cur_read_end = read_head;
951 char* cur_read_stop = &(read_head[read_attempt_size]);
952 switch (file_type) {
953 case kFileUncompressed:
954 {
955 cur_read_end += fread_unlocked(read_head, 1, read_attempt_size, ff);
956 if (unlikely(ferror_unlocked(ff))) {
957 goto TextStreamThread_READ_FAIL;
958 }
959 break;
960 }
961 case kFileGzip:
962 {
963 reterr = GzRawStreamRead(cur_read_stop, ff, &rdsp->gz, &cur_read_end, &syncp->errmsg);
964 if (unlikely(reterr)) {
965 goto TextStreamThread_MISC_FAIL;
966 }
967 break;
968 }
969 case kFileBgzf:
970 {
971 reterr = BgzfRawMtStreamRead(R_CAST(unsigned char*, cur_read_stop), &rdsp->bgzf, R_CAST(unsigned char**, &cur_read_end), &syncp->errmsg);
972 if (unlikely(reterr)) {
973 goto TextStreamThread_MISC_FAIL;
974 }
975 break;
976 }
977 case kFileZstd:
978 {
979 reterr = ZstRawStreamRead(cur_read_stop, ff, &rdsp->zst, &cur_read_end, &syncp->errmsg);
980 if (unlikely(reterr)) {
981 goto TextStreamThread_MISC_FAIL;
982 }
983 break;
984 }
985 }
986 if (cur_read_end < cur_read_stop) {
987 char* final_read_head = cur_read_end;
988 if (cur_block_start != final_read_head) {
989 if (final_read_head[-1] != '\n') {
990 // Append '\n' so consumer can always use rawmemchr(., '\n') to
991 // find the end of the current line.
992 *final_read_head++ = '\n';
993 }
994 }
995 // Still want to consistently enforce max line/token length.
996 if (unlikely(IsPathologicallyLongLineOrToken(cur_block_start, read_head, final_read_head, enforced_max_line_blen))) {
997 goto TextStreamThread_LONG_LINE;
998 }
999 read_head = final_read_head;
1000 goto TextStreamThread_EOF;
1001 }
1002 char* last_byte_ptr;
1003 if (!is_token_stream) {
1004 last_byte_ptr = Memrchr(read_head, '\n', read_attempt_size);
1005 } else {
1006 last_byte_ptr = LastSpaceOrEoln(read_head, read_attempt_size);
1007 }
1008 if (last_byte_ptr) {
1009 char* next_available_end = &(last_byte_ptr[1]);
1010 if (unlikely(IsPathologicallyLongLineOrToken(cur_block_start, read_head, next_available_end, enforced_max_line_blen))) {
1011 goto TextStreamThread_LONG_LINE;
1012 }
1013 #ifdef _WIN32
1014 EnterCriticalSection(critical_sectionp);
1015 #else
1016 pthread_mutex_lock(sync_mutexp);
1017 #endif
1018 interrupt = syncp->interrupt;
1019 if (interrupt != kTxsInterruptNone) {
1020 goto TextStreamThread_INTERRUPT;
1021 }
1022 char* latest_consume_tail = syncp->consume_tail;
1023 const uint32_t all_later_bytes_consumed = (latest_consume_tail <= cur_block_start);
1024 const uint32_t return_to_start = all_later_bytes_consumed && (latest_consume_tail >= &(buf[kDecompressChunkSize]));
1025 if (return_to_start) {
1026 // bugfix (2 Oct 2018): This was previously setting
1027 // syncp->available_end = next_available_end too, and that was being
1028 // handled as a special case which conflicted with a rare legitimate
1029 // case.
1030 syncp->cur_circular_end = next_available_end;
1031 syncp->available_end = buf;
1032 } else {
1033 syncp->available_end = next_available_end;
1034 }
1035 #ifdef _WIN32
1036 // bugfix (23 Mar 2018): this needs to be in the critical section,
1037 // otherwise there's a danger of this resetting legitimate progress
1038 ResetEvent(consumer_progress_event);
1039 SetEvent(reader_progress_event);
1040 LeaveCriticalSection(critical_sectionp);
1041 #else
1042 // bugfix (21 Mar 2018): must force consumer_progress_state to 0 (or
1043 // ResetEvent(consumer_progress_event); otherwise the other wait loop's
1044 // read_stop = buf_end assignment may occur before all later bytes are
1045 // actually consumed, in the next_available_end == latest_consume_tail
1046 // edge case.
1047 syncp->consumer_progress_state = 0;
1048 pthread_cond_signal(reader_progress_condvarp);
1049 pthread_mutex_unlock(sync_mutexp);
1050 #endif
1051 if (return_to_start) {
1052 // Best to return to the beginning of the buffer.
1053 // (Note that read_attempt_size is guaranteed to be
1054 // <= kDecompressChunkSize.)
1055 const uintptr_t trailing_byte_ct = cur_read_end - next_available_end;
1056 memcpy(buf, next_available_end, trailing_byte_ct);
1057 cur_block_start = buf;
1058 read_head = &(buf[trailing_byte_ct]);
1059 // May as well reduce false sharing risk.
1060 read_stop = R_CAST(char*, RoundDownPow2(R_CAST(uintptr_t, latest_consume_tail), kCacheline));
1061 continue;
1062 }
1063 if (all_later_bytes_consumed) {
1064 read_stop = buf_end;
1065 } else {
1066 read_stop = R_CAST(char*, RoundDownPow2(R_CAST(uintptr_t, latest_consume_tail), kCacheline));
1067 }
1068 cur_block_start = next_available_end;
1069 }
1070 read_head = cur_read_end;
1071 }
1072 while (0) {
1073 TextStreamThread_NOMEM:
1074 min_interrupt = kTxsInterruptShutdown;
1075 reterr = kPglRetNomem;
1076 break;
1077 TextStreamThread_OPEN_FAIL:
1078 min_interrupt = kTxsInterruptShutdown;
1079 syncp->errmsg = strerror(errno);
1080 reterr = kPglRetOpenFail;
1081 break;
1082 TextStreamThread_READ_FAIL:
1083 min_interrupt = kTxsInterruptShutdown;
1084 syncp->errmsg = strerror(errno);
1085 reterr = kPglRetReadFail;
1086 break;
1087 TextStreamThread_LONG_LINE:
1088 min_interrupt = kTxsInterruptShutdown;
1089 syncp->errmsg = kShortErrLongLine;
1090 reterr = kPglRetMalformedInput;
1091 break;
1092 TextStreamThread_EOF:
1093 min_interrupt = kTxsInterruptRetarget;
1094 reterr = kPglRetEof;
1095 break;
1096 TextStreamThread_MISC_FAIL:
1097 min_interrupt = kTxsInterruptShutdown;
1098 break;
1099 }
1100 // We need to wait for a message from the consumer before we can usefully
1101 // proceed.
1102 // More precisely:
1103 // * In the eof subcase, we're waiting for either a rewind or shutdown
1104 // request.
1105 // * In the error subcase, we're just waiting for the shutdown request.
1106
1107 // Pass the error code back.
1108 #ifdef _WIN32
1109 EnterCriticalSection(critical_sectionp);
1110 #else
1111 pthread_mutex_lock(sync_mutexp);
1112 #endif
1113 syncp->reterr = reterr;
1114 interrupt = syncp->interrupt;
1115 if (interrupt >= min_interrupt) {
1116 // It's our lucky day: we don't need to wait again.
1117 goto TextStreamThread_INTERRUPT;
1118 }
1119 if (reterr == kPglRetEof) {
1120 syncp->available_end = read_head;
1121 }
1122 #ifdef _WIN32
1123 SetEvent(reader_progress_event);
1124 LeaveCriticalSection(critical_sectionp);
1125 while (1) {
1126 WaitForSingleObject(consumer_progress_event, INFINITE);
1127 EnterCriticalSection(critical_sectionp);
1128 interrupt = syncp->interrupt;
1129 if (interrupt >= min_interrupt) {
1130 break;
1131 }
1132 LeaveCriticalSection(critical_sectionp);
1133 }
1134 #else
1135 pthread_cond_signal(reader_progress_condvarp);
1136 do {
1137 while (!syncp->consumer_progress_state) {
1138 pthread_cond_wait(consumer_progress_condvarp, sync_mutexp);
1139 }
1140 syncp->consumer_progress_state = 0;
1141 interrupt = syncp->interrupt;
1142 } while (interrupt < min_interrupt);
1143 #endif
1144 TextStreamThread_INTERRUPT:
1145 // must be in critical section here, or be holding the mutex.
1146 if (interrupt == kTxsInterruptRetarget) {
1147 new_fname = syncp->new_fname;
1148 syncp->interrupt = kTxsInterruptNone;
1149 syncp->reterr = kPglRetSuccess;
1150 }
1151 #ifdef _WIN32
1152 LeaveCriticalSection(critical_sectionp);
1153 #else
1154 pthread_mutex_unlock(sync_mutexp);
1155 #endif
1156 if (interrupt == kTxsInterruptShutdown) {
1157 // possible todo: close the file here
1158 THREAD_RETURN;
1159 }
1160 assert(interrupt == kTxsInterruptRetarget);
1161 read_head = buf;
1162 if (!new_fname) {
1163 if (file_type == kFileBgzf) {
1164 reterr = BgzfRawMtStreamRewind(&rdsp->bgzf, &syncp->errmsg);
1165 if (unlikely(reterr)) {
1166 goto TextStreamThread_MISC_FAIL;
1167 }
1168 } else {
1169 // See TextFileRewind().
1170 rewind(ff);
1171 if (file_type != kFileUncompressed) {
1172 if (file_type == kFileGzip) {
1173 rdsp->gz.ds.avail_in = 0;
1174 #ifdef NDEBUG
1175 inflateReset(&rdsp->gz.ds);
1176 #else
1177 const int errcode = inflateReset(&rdsp->gz.ds);
1178 assert(errcode == Z_OK);
1179 #endif
1180 } else {
1181 // kFileZstd
1182 rdsp->zst.ib.size = 0;
1183 rdsp->zst.ib.pos = 0;
1184 ZSTD_DCtx_reset(rdsp->zst.ds, ZSTD_reset_session_only);
1185 }
1186 }
1187 }
1188 } else {
1189 // Switch to another file, with less creation/destruction of resources.
1190 FILE* next_ff = fopen(new_fname, FOPEN_RB);
1191 if (unlikely(!next_ff)) {
1192 goto TextStreamThread_OPEN_FAIL;
1193 }
1194 // See TextFileOpenInternal().
1195 uint32_t nbytes = fread_unlocked(buf, 1, 16, next_ff);
1196 FileCompressionType next_file_type = kFileUncompressed;
1197 if (nbytes >= 4) {
1198 const uint32_t magic4 = *R_CAST(uint32_t*, buf);
1199 if (IsZstdFrame(magic4)) {
1200 next_file_type = kFileZstd;
1201 } else if ((magic4 << 8) == 0x088b1f00) {
1202 if ((nbytes == 16) && IsBgzfHeader(buf)) {
1203 next_file_type = kFileBgzf;
1204 } else {
1205 next_file_type = kFileGzip;
1206 }
1207 }
1208 }
1209 if (file_type != next_file_type) {
1210 // Destroy old type-specific resources, and allocate new ones.
1211 if (file_type == kFileGzip) {
1212 free(rdsp->gz.in);
1213 inflateEnd(&rdsp->gz.ds);
1214 } else if (file_type == kFileBgzf) {
1215 CleanupBgzfRawMtStream(&rdsp->bgzf);
1216 } else if (file_type == kFileZstd) {
1217 free_const(rdsp->zst.ib.src);
1218 ZSTD_freeDStream(rdsp->zst.ds);
1219 }
1220
1221 if (unlikely(fclose(ff))) {
1222 fclose(next_ff);
1223 goto TextStreamThread_READ_FAIL;
1224 }
1225 ff = next_ff;
1226 basep->ff = ff;
1227 file_type = next_file_type;
1228 basep->file_type = file_type;
1229 switch (file_type) {
1230 case kFileUncompressed:
1231 read_head = &(read_head[nbytes]);
1232 break;
1233 case kFileGzip:
1234 if (unlikely(GzRawInit(buf, nbytes, &rdsp->gz))) {
1235 goto TextStreamThread_NOMEM;
1236 }
1237 break;
1238 case kFileBgzf:
1239 reterr = BgzfRawMtStreamInit(buf, context->decompress_thread_ct, ff, nullptr, &rdsp->bgzf, &syncp->errmsg);
1240 if (unlikely(reterr)) {
1241 goto TextStreamThread_MISC_FAIL;
1242 }
1243 // bugfix (5 Oct 2019): forgot this break
1244 break;
1245 case kFileZstd:
1246 if (unlikely(ZstRawInit(buf, nbytes, &rdsp->zst))) {
1247 goto TextStreamThread_NOMEM;
1248 }
1249 break;
1250 }
1251 } else {
1252 switch (file_type) {
1253 case kFileUncompressed:
1254 read_head = &(read_head[nbytes]);
1255 break;
1256 // Rest of this is similar to rewind.
1257 case kFileGzip:
1258 {
1259 GzRawDecompressStream* gzp = &rdsp->gz;
1260 z_stream* dsp = &gzp->ds;
1261 #ifdef NDEBUG
1262 inflateReset(dsp);
1263 #else
1264 const int errcode = inflateReset(dsp);
1265 assert(errcode == Z_OK);
1266 #endif
1267 memcpy(gzp->in, buf, nbytes);
1268 dsp->next_in = gzp->in;
1269 dsp->avail_in = nbytes;
1270 break;
1271 }
1272 case kFileBgzf:
1273 {
1274 reterr = BgzfRawMtStreamRetarget(buf, &rdsp->bgzf, next_ff, &syncp->errmsg);
1275 if (unlikely(reterr)) {
1276 fclose(next_ff);
1277 goto TextStreamThread_MISC_FAIL;
1278 }
1279 break;
1280 }
1281 case kFileZstd:
1282 {
1283 ZstRawDecompressStream* zstp = &rdsp->zst;
1284 ZSTD_DCtx_reset(zstp->ds, ZSTD_reset_session_only);
1285 memcpy(K_CAST(void*, zstp->ib.src), buf, nbytes);
1286 zstp->ib.size = nbytes;
1287 zstp->ib.pos = 0;
1288 break;
1289 }
1290 }
1291 if (unlikely(fclose(ff))) {
1292 fclose(next_ff);
1293 goto TextStreamThread_READ_FAIL;
1294 }
1295 ff = next_ff;
1296 basep->ff = ff;
1297 }
1298 }
1299 cur_block_start = buf;
1300 read_stop = buf_end;
1301 }
1302 }
1303
1304 const char kShortErrRfileInvalid[] = "TextStreamOpenEx can't be called with a closed or error-state textFILE";
1305
TextStreamOpenEx(const char * fname,uint32_t enforced_max_line_blen,uint32_t dst_capacity,uint32_t decompress_thread_ct,textFILE * txf_ptr,char * dst,TextStream * txs_ptr)1306 PglErr TextStreamOpenEx(const char* fname, uint32_t enforced_max_line_blen, uint32_t dst_capacity, uint32_t decompress_thread_ct, textFILE* txf_ptr, char* dst, TextStream* txs_ptr) {
1307 TextStreamMain* txsp = GetTxsp(txs_ptr);
1308 TextFileBase* txs_basep = &txsp->base;
1309 PglErr reterr = kPglRetSuccess;
1310 {
1311 txsp->decompress_thread_ct = decompress_thread_ct;
1312 if (txf_ptr) {
1313 // Move-construct (unless there was an error, or file is not opened)
1314 if (unlikely((!TextFileIsOpen(txf_ptr)) || TextFileErrcode(txf_ptr))) {
1315 reterr = kPglRetImproperFunctionCall;
1316 txs_basep->errmsg = kShortErrRfileInvalid;
1317 goto TextStreamOpenEx_ret_1;
1318 }
1319 if (unlikely(TextIsOpen(txs_ptr))) {
1320 reterr = kPglRetImproperFunctionCall;
1321 txs_basep->errmsg = kShortErrRfileAlreadyOpen;
1322 goto TextStreamOpenEx_ret_1;
1323 }
1324 textFILEMain* txfp = GetTxfp(txf_ptr);
1325 *txs_basep = txfp->base; // struct copy
1326 // Simplify TextStreamThread() initialization.
1327 const uint32_t backfill_ct = txs_basep->consume_iter - txs_basep->dst;
1328 if (backfill_ct) {
1329 txs_basep->dst_len -= backfill_ct;
1330 memmove(txs_basep->dst, txs_basep->consume_iter, txs_basep->dst_len);
1331 txs_basep->consume_iter = txs_basep->dst;
1332 txs_basep->consume_stop -= backfill_ct;
1333 }
1334 txs_basep->enforced_max_line_blen = enforced_max_line_blen;
1335 assert(txs_basep->dst_len <= dst_capacity);
1336 txs_basep->dst_capacity = dst_capacity;
1337 reterr = txfp->base.reterr;
1338 const FileCompressionType file_type = txfp->base.file_type;
1339 if (file_type != kFileUncompressed) {
1340 if (file_type == kFileGzip) {
1341 txsp->rds.gz = txfp->rds.gz;
1342 } else if (file_type == kFileZstd) {
1343 txsp->rds.zst = txfp->rds.zst;
1344 } else {
1345 reterr = BgzfRawMtStreamInit(nullptr, decompress_thread_ct, txs_basep->ff, &txfp->rds.bgzf, &txsp->rds.bgzf, &txs_basep->errmsg);
1346 if (unlikely(reterr)) {
1347 EraseTextFileBase(&txfp->base);
1348 goto TextStreamOpenEx_ret_1;
1349 }
1350 }
1351 }
1352 EraseTextFileBase(&txfp->base);
1353 } else {
1354 reterr = TextFileOpenInternal(fname, enforced_max_line_blen, dst_capacity, dst, nullptr, txsp);
1355 }
1356 if (reterr) {
1357 if (reterr == kPglRetEof) {
1358 txs_basep->reterr = kPglRetEof;
1359 return kPglRetSuccess;
1360 }
1361 goto TextStreamOpenEx_ret_1;
1362 }
1363 assert(!txsp->syncp);
1364 TextStreamSync* syncp;
1365 if (unlikely(cachealigned_malloc(RoundUpPow2(sizeof(TextStreamSync), kCacheline), &syncp))) {
1366 goto TextStreamOpenEx_ret_NOMEM;
1367 }
1368 txsp->syncp = syncp;
1369 dst = txs_basep->dst;
1370 syncp->consume_tail = dst;
1371 syncp->cur_circular_end = nullptr;
1372 syncp->available_end = txs_basep->consume_stop;
1373 syncp->errmsg = nullptr;
1374 syncp->reterr = kPglRetSuccess;
1375 syncp->dst_reallocated = 0;
1376 syncp->interrupt = kTxsInterruptNone;
1377 syncp->new_fname = nullptr;
1378 #ifdef _WIN32
1379 syncp->read_thread = nullptr;
1380 // apparently this can raise a low-memory exception in older Windows
1381 // versions, but that's not really our problem.
1382 InitializeCriticalSection(&syncp->critical_section);
1383
1384 syncp->reader_progress_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
1385 if (unlikely(!syncp->reader_progress_event)) {
1386 DeleteCriticalSection(&syncp->critical_section);
1387 goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1388 }
1389 syncp->consumer_progress_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
1390 if (unlikely(!syncp->consumer_progress_event)) {
1391 DeleteCriticalSection(&syncp->critical_section);
1392 CloseHandle(syncp->reader_progress_event);
1393 goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1394 }
1395 syncp->read_thread = R_CAST(HANDLE, _beginthreadex(nullptr, kDefaultThreadStack, TextStreamThread, txsp, 0, nullptr));
1396 if (unlikely(!syncp->read_thread)) {
1397 DeleteCriticalSection(&syncp->critical_section);
1398 CloseHandle(syncp->consumer_progress_event);
1399 CloseHandle(syncp->reader_progress_event);
1400 goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1401 }
1402 #else
1403 syncp->sync_init_state = 0;
1404 if (unlikely(pthread_mutex_init(&syncp->sync_mutex, nullptr))) {
1405 goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1406 }
1407 syncp->sync_init_state = 1;
1408 if (unlikely(pthread_cond_init(&syncp->reader_progress_condvar, nullptr))) {
1409 goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1410 }
1411 syncp->sync_init_state = 2;
1412 syncp->consumer_progress_state = 0;
1413 if (unlikely(pthread_cond_init(&syncp->consumer_progress_condvar, nullptr))) {
1414 goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1415 }
1416 syncp->sync_init_state = 3;
1417 # ifndef __cplusplus
1418 pthread_attr_t smallstack_thread_attr;
1419 if (unlikely(pthread_attr_init(&smallstack_thread_attr))) {
1420 goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1421 }
1422 pthread_attr_setstacksize(&smallstack_thread_attr, kDefaultThreadStack);
1423 # endif
1424 if (unlikely(pthread_create(&syncp->read_thread,
1425 # ifdef __cplusplus
1426 &g_thread_startup.smallstack_thread_attr,
1427 # else
1428 &smallstack_thread_attr,
1429 # endif
1430 TextStreamThread, txsp))) {
1431 # ifndef __cplusplus
1432 pthread_attr_destroy(&smallstack_thread_attr);
1433 # endif
1434 goto TextStreamOpenEx_ret_THREAD_CREATE_FAIL;
1435 }
1436 # ifndef __cplusplus
1437 pthread_attr_destroy(&smallstack_thread_attr);
1438 # endif
1439 syncp->sync_init_state = 4;
1440 #endif
1441 }
1442 while (0) {
1443 TextStreamOpenEx_ret_NOMEM:
1444 reterr = kPglRetNomem;
1445 break;
1446 TextStreamOpenEx_ret_THREAD_CREATE_FAIL:
1447 reterr = kPglRetThreadCreateFail;
1448 break;
1449 }
1450 TextStreamOpenEx_ret_1:
1451 txs_basep->reterr = reterr;
1452 return reterr;
1453 }
1454
TextDecompressThreadCt(const TextStream * txs_ptr)1455 uint32_t TextDecompressThreadCt(const TextStream* txs_ptr) {
1456 const TextStreamMain* txsp = GetTxspK(txs_ptr);
1457 FileCompressionType file_type = txsp->base.file_type;
1458 if (file_type == kFileUncompressed) {
1459 return 0;
1460 }
1461 if (file_type != kFileBgzf) {
1462 return 1;
1463 }
1464 return GetThreadCtTg(&txsp->rds.bgzf.tg);
1465 }
1466
TextAdvance(TextStream * txs_ptr)1467 PglErr TextAdvance(TextStream* txs_ptr) {
1468 TextStreamMain* txsp = GetTxsp(txs_ptr);
1469 TextFileBase* basep = &txsp->base;
1470 char* consume_iter = basep->consume_iter;
1471 TextStreamSync* syncp = txsp->syncp;
1472 #ifdef _WIN32
1473 CRITICAL_SECTION* critical_sectionp = &syncp->critical_section;
1474 HANDLE consumer_progress_event = syncp->consumer_progress_event;
1475 while (1) {
1476 EnterCriticalSection(critical_sectionp);
1477 const PglErr reterr = syncp->reterr;
1478 if (unlikely((reterr != kPglRetSuccess) && (reterr != kPglRetEof))) {
1479 basep->errmsg = syncp->errmsg;
1480 LeaveCriticalSection(critical_sectionp);
1481 basep->reterr = reterr;
1482 // No need to set consumer_progress event here, just let the cleanup
1483 // routine take care of that.
1484 return reterr;
1485 }
1486 char* available_end = syncp->available_end;
1487 char* cur_circular_end = syncp->cur_circular_end;
1488 if (consume_iter == cur_circular_end) {
1489 char* buf = basep->dst;
1490 consume_iter = buf;
1491 basep->consume_iter = buf;
1492 cur_circular_end = nullptr;
1493 syncp->cur_circular_end = nullptr;
1494 if (consume_iter != available_end) {
1495 SetEvent(consumer_progress_event);
1496 }
1497 }
1498 if (syncp->dst_reallocated) {
1499 consume_iter = basep->dst;
1500 syncp->dst_reallocated = 0;
1501 }
1502 syncp->consume_tail = consume_iter;
1503 if ((consume_iter != available_end) || cur_circular_end) {
1504 if (cur_circular_end) {
1505 basep->consume_stop = cur_circular_end;
1506 } else {
1507 basep->consume_stop = available_end;
1508 }
1509 LeaveCriticalSection(critical_sectionp);
1510 // We could set the consumer_progress event here, but it's not really
1511 // necessary?
1512 // SetEvent(consumer_progress_event);
1513 return kPglRetSuccess;
1514 }
1515 SetEvent(consumer_progress_event);
1516 LeaveCriticalSection(critical_sectionp);
1517 // We've processed all the consume-ready bytes...
1518 if (reterr != kPglRetSuccess) {
1519 // ...and we're at eof. Don't set consumer_progress event here; let that
1520 // wait until cleanup or rewind/retarget.
1521 basep->reterr = kPglRetEof;
1522 return kPglRetEof;
1523 }
1524 // ...and there's probably more.
1525 WaitForSingleObject(syncp->reader_progress_event, INFINITE);
1526 // bugfix (2 Oct 2018)
1527 consume_iter = syncp->consume_tail;
1528 basep->consume_iter = consume_iter;
1529 }
1530 #else
1531 pthread_mutex_t* sync_mutexp = &syncp->sync_mutex;
1532 pthread_cond_t* consumer_progress_condvarp = &syncp->consumer_progress_condvar;
1533 pthread_cond_t* reader_progress_condvarp = &syncp->reader_progress_condvar;
1534 pthread_mutex_lock(sync_mutexp);
1535 while (1) {
1536 const PglErr reterr = syncp->reterr;
1537 if (unlikely((reterr != kPglRetSuccess) && (reterr != kPglRetEof))) {
1538 basep->errmsg = syncp->errmsg;
1539 pthread_mutex_unlock(sync_mutexp);
1540 basep->reterr = reterr;
1541 return reterr;
1542 }
1543 char* available_end = syncp->available_end;
1544 // bugfix (2 Oct 2018): There were TWO consume_iter == available_end ==
1545 // cur_circular_end cases.
1546 // printf("checking for more to consume: %lx %lx %lx\n", (uintptr_t)consume_iter, (uintptr_t)syncp->cur_circular_end, (uintptr_t)available_end);
1547 if (consume_iter == syncp->cur_circular_end) {
1548 char* buf = basep->dst;
1549 consume_iter = buf;
1550 basep->consume_iter = buf;
1551 syncp->cur_circular_end = nullptr;
1552 // File-reader could be waiting on either "all bytes in front have been
1553 // consumed, some bytes behind may remain" or "all bytes have been
1554 // consumed". Signal in case it's the first.
1555 if (consume_iter != available_end) {
1556 syncp->consumer_progress_state = 1;
1557 pthread_cond_signal(consumer_progress_condvarp);
1558 }
1559 }
1560 if (syncp->dst_reallocated) {
1561 consume_iter = basep->dst;
1562 syncp->dst_reallocated = 0;
1563 }
1564 syncp->consume_tail = consume_iter;
1565 // If cur_circular_end is still non-null here, there must be bytes
1566 // available even when consume_iter == available_end. (Is the latter
1567 // still possible? Check this.)
1568 if ((consume_iter != available_end) || syncp->cur_circular_end) {
1569 if (syncp->cur_circular_end) {
1570 basep->consume_stop = syncp->cur_circular_end;
1571 } else {
1572 basep->consume_stop = available_end;
1573 }
1574 // pthread_cond_signal(consumer_progress_condvarp);
1575 pthread_mutex_unlock(sync_mutexp);
1576 // printf("consuming %lx..%lx\n", (uintptr_t)(*consume_iterp), (uintptr_t)rlsp->consume_stop);
1577 return kPglRetSuccess;
1578 }
1579 // We've processed all the consume-ready bytes...
1580 if (reterr != kPglRetSuccess) {
1581 // ...and we're at eof.
1582 pthread_mutex_unlock(sync_mutexp);
1583 basep->reterr = kPglRetEof;
1584 return kPglRetEof;
1585 }
1586 // ...and there's probably more.
1587 syncp->consumer_progress_state = 1;
1588 pthread_cond_signal(consumer_progress_condvarp);
1589 // no need for an explicit spurious-wakeup check, we'll check the progress
1590 // condition (available_end has advanced, or we have a read error) anyway
1591 // and get back here if it isn't satisfied
1592 pthread_cond_wait(reader_progress_condvarp, sync_mutexp);
1593 // bugfix (2 Oct 2018)
1594 consume_iter = syncp->consume_tail;
1595 basep->consume_iter = syncp->consume_tail;
1596 }
1597 #endif
1598 }
1599
TextOnlyEmptyLinesLeft(TextStream * txs_ptr)1600 PglErr TextOnlyEmptyLinesLeft(TextStream* txs_ptr) {
1601 TextFileBase* basep = &GetTxsp(txs_ptr)->base;
1602 char* line_start = basep->consume_iter;
1603 while (1) {
1604 if (line_start == basep->consume_stop) {
1605 basep->consume_iter = line_start;
1606 PglErr reterr = TextAdvance(txs_ptr);
1607 if (reterr) {
1608 return reterr;
1609 }
1610 line_start = basep->consume_iter;
1611 }
1612 line_start = FirstNonTspace(line_start);
1613 if (unlikely(!IsEolnKns(*line_start))) {
1614 basep->reterr = kPglRetMalformedInput;
1615 basep->errmsg = kShortErrInteriorEmptyLine;
1616 return kPglRetMalformedInput;
1617 }
1618 line_start = AdvPastDelim(line_start, '\n');
1619 }
1620 }
1621
TextSkipNz(uintptr_t skip_ct,TextStream * txs_ptr)1622 PglErr TextSkipNz(uintptr_t skip_ct, TextStream* txs_ptr) {
1623 TextFileBase* basep = &GetTxsp(txs_ptr)->base;
1624 #ifdef __LP64__
1625 char* consume_iter = basep->consume_iter;
1626 // Minor extension of AdvToNthDelimChecked().
1627 const VecUc vvec_all_lf = vecuc_set1('\n');
1628 while (1) {
1629 uintptr_t starting_addr = R_CAST(uintptr_t, consume_iter);
1630 VecUc* consume_viter = R_CAST(VecUc*, RoundDownPow2(starting_addr, kBytesPerVec));
1631 uintptr_t ending_addr = R_CAST(uintptr_t, basep->consume_stop);
1632 VecUc* consume_vstop = R_CAST(VecUc*, RoundDownPow2(ending_addr, kBytesPerVec));
1633 VecUc cur_vvec = *consume_viter;
1634 VecUc lf_vvec = (cur_vvec == vvec_all_lf);
1635 uint32_t lf_bytes = vecuc_movemask(lf_vvec);
1636 const uint32_t leading_byte_ct = starting_addr - R_CAST(uintptr_t, consume_viter);
1637 const uint32_t leading_mask = UINT32_MAX << leading_byte_ct;
1638 lf_bytes &= leading_mask;
1639 uint32_t cur_lf_ct;
1640 for (; consume_viter != consume_vstop; ) {
1641 cur_lf_ct = PopcountVec8thUint(lf_bytes);
1642 if (cur_lf_ct >= skip_ct) {
1643 goto TextSkipNz_finish;
1644 }
1645 skip_ct -= cur_lf_ct;
1646 // bugfix (28 Sep 2019): forgot to update cur_vvec/lf_vvec/lf_bytes?!
1647 ++consume_viter;
1648 cur_vvec = *consume_viter;
1649 lf_vvec = (cur_vvec == vvec_all_lf);
1650 lf_bytes = vecuc_movemask(lf_vvec);
1651 }
1652 lf_bytes &= (1U << (ending_addr % kBytesPerVec)) - 1;
1653 cur_lf_ct = PopcountVec8thUint(lf_bytes);
1654 if (cur_lf_ct >= skip_ct) {
1655 TextSkipNz_finish:
1656 lf_bytes = ClearBottomSetBits(skip_ct - 1, lf_bytes);
1657 const uint32_t byte_offset_in_vec = ctzu32(lf_bytes) + 1;
1658 const uintptr_t result_addr = R_CAST(uintptr_t, consume_viter) + byte_offset_in_vec;
1659 basep->consume_iter = R_CAST(char*, result_addr);
1660 return kPglRetSuccess;
1661 }
1662 skip_ct -= cur_lf_ct;
1663 // bugfix (30 Oct 2019)
1664 basep->consume_iter = basep->consume_stop;
1665 PglErr reterr = TextAdvance(txs_ptr);
1666 // not unlikely() due to eof
1667 if (reterr) {
1668 return reterr;
1669 }
1670 consume_iter = basep->consume_iter;
1671 }
1672 #else
1673 char* consume_iter = basep->consume_iter;
1674 char* consume_stop = basep->consume_stop;
1675 for (uintptr_t ulii = 0; ulii != skip_ct; ++ulii) {
1676 if (consume_iter == consume_stop) {
1677 basep->consume_iter = consume_iter;
1678 PglErr reterr = TextAdvance(txs_ptr);
1679 if (reterr) {
1680 return reterr;
1681 }
1682 consume_iter = basep->consume_iter;
1683 consume_stop = basep->consume_stop;
1684 }
1685 consume_iter = AdvPastDelim(consume_iter, '\n');
1686 }
1687 basep->consume_iter = consume_iter;
1688 return kPglRetSuccess;
1689 #endif
1690 }
1691
TextRetarget(const char * new_fname,TextStream * txs_ptr)1692 PglErr TextRetarget(const char* new_fname, TextStream* txs_ptr) {
1693 TextStreamMain* txsp = GetTxsp(txs_ptr);
1694 TextFileBase* basep = &txsp->base;
1695 TextStreamSync* syncp = txsp->syncp;
1696 #ifdef _WIN32
1697 CRITICAL_SECTION* critical_sectionp = &syncp->critical_section;
1698 EnterCriticalSection(critical_sectionp);
1699 const PglErr reterr = syncp->reterr;
1700 if (reterr != kPglRetSuccess) {
1701 if (unlikely(reterr != kPglRetEof)) {
1702 basep->errmsg = syncp->errmsg;
1703 LeaveCriticalSection(critical_sectionp);
1704 basep->reterr = reterr;
1705 return reterr;
1706 }
1707 // clear eof
1708 syncp->reterr = kPglRetSuccess;
1709 }
1710 basep->reterr = kPglRetSuccess;
1711 // bugfix (5 Mar 2018): need to reset these here, can't wait for reader
1712 // thread to receive signal
1713 char* buf = basep->dst;
1714 syncp->consume_tail = buf;
1715 syncp->cur_circular_end = nullptr;
1716 syncp->available_end = buf;
1717 syncp->dst_reallocated = 0;
1718 syncp->interrupt = kTxsInterruptRetarget;
1719 // Could also just open the file in this function (before acquiring the
1720 // mutex) and pass a gzFile. Advantages: nothing bad happens if new_fname
1721 // is overwritten before it's read, RLstreamErrPrint() no longer has to deal
1722 // with OpenFail error. Disadvantage: peak resource usage is a bit higher if
1723 // we open the second file before closing the first one. Advantages probably
1724 // outweigh disadvantages, but I'll wait till --pmerge development to make a
1725 // decision since that's the main function that actually cares.
1726 syncp->new_fname = new_fname;
1727 SetEvent(syncp->consumer_progress_event);
1728 LeaveCriticalSection(critical_sectionp);
1729 #else
1730 pthread_mutex_t* sync_mutexp = &syncp->sync_mutex;
1731 pthread_cond_t* consumer_progress_condvarp = &syncp->consumer_progress_condvar;
1732 pthread_mutex_lock(sync_mutexp);
1733 const PglErr reterr = syncp->reterr;
1734 if (reterr != kPglRetSuccess) {
1735 if (unlikely(reterr != kPglRetEof)) {
1736 basep->errmsg = syncp->errmsg;
1737 pthread_mutex_unlock(sync_mutexp);
1738 basep->reterr = reterr;
1739 return reterr;
1740 }
1741 // clear eof
1742 syncp->reterr = kPglRetSuccess;
1743 }
1744 // bugfix (4 Oct 2019): also need to clear eof here
1745 basep->reterr = kPglRetSuccess;
1746 char* buf = basep->dst;
1747 syncp->consume_tail = buf;
1748 syncp->cur_circular_end = nullptr;
1749 syncp->available_end = buf;
1750 syncp->dst_reallocated = 0;
1751 syncp->interrupt = kTxsInterruptRetarget;
1752 syncp->new_fname = new_fname;
1753 syncp->consumer_progress_state = 1;
1754 pthread_cond_signal(consumer_progress_condvarp);
1755 pthread_mutex_unlock(sync_mutexp);
1756 #endif
1757 basep->consume_iter = buf;
1758 basep->consume_stop = buf;
1759 return kPglRetSuccess;
1760 }
1761
CleanupTextStream(TextStream * txs_ptr,PglErr * reterrp)1762 BoolErr CleanupTextStream(TextStream* txs_ptr, PglErr* reterrp) {
1763 TextStreamMain* txsp = GetTxsp(txs_ptr);
1764 TextFileBase* basep = &txsp->base;
1765 TextStreamSync* syncp = txsp->syncp;
1766 if (syncp) {
1767 #ifdef _WIN32
1768 if (syncp->read_thread) {
1769 CRITICAL_SECTION* critical_sectionp = &syncp->critical_section;
1770 EnterCriticalSection(critical_sectionp);
1771 syncp->interrupt = kTxsInterruptShutdown;
1772 SetEvent(syncp->consumer_progress_event);
1773 LeaveCriticalSection(critical_sectionp);
1774 WaitForSingleObject(syncp->read_thread, INFINITE);
1775 DeleteCriticalSection(critical_sectionp);
1776 CloseHandle(syncp->consumer_progress_event);
1777 CloseHandle(syncp->reader_progress_event);
1778 }
1779 #else
1780 const uint32_t sync_init_state = syncp->sync_init_state;
1781 if (sync_init_state) {
1782 pthread_mutex_t* sync_mutexp = &syncp->sync_mutex;
1783 pthread_cond_t* consumer_progress_condvarp = &syncp->consumer_progress_condvar;
1784 if (sync_init_state == 4) {
1785 pthread_mutex_lock(sync_mutexp);
1786 syncp->interrupt = kTxsInterruptShutdown;
1787 syncp->consumer_progress_state = 1;
1788 pthread_cond_signal(consumer_progress_condvarp);
1789 pthread_mutex_unlock(sync_mutexp);
1790 pthread_join(syncp->read_thread, nullptr);
1791 }
1792 pthread_mutex_destroy(sync_mutexp);
1793 if (sync_init_state > 1) {
1794 pthread_cond_destroy(&syncp->reader_progress_condvar);
1795 if (sync_init_state > 2) {
1796 pthread_cond_destroy(consumer_progress_condvarp);
1797 }
1798 }
1799 }
1800 #endif
1801 aligned_free(txsp->syncp);
1802 txsp->syncp = nullptr;
1803 }
1804 basep->consume_iter = nullptr;
1805 basep->consume_stop = nullptr;
1806 basep->reterr = kPglRetEof;
1807 basep->errmsg = nullptr;
1808 if (basep->dst && (!basep->dst_owned_by_consumer)) {
1809 free(basep->dst);
1810 basep->dst = nullptr;
1811 }
1812 if (basep->ff) {
1813 if (basep->file_type != kFileUncompressed) {
1814 if (basep->file_type == kFileZstd) {
1815 if (txsp->rds.zst.ib.src) {
1816 free_const(txsp->rds.zst.ib.src);
1817 txsp->rds.zst.ib.src = nullptr;
1818 }
1819 if (txsp->rds.zst.ds) {
1820 ZSTD_freeDStream(txsp->rds.zst.ds);
1821 txsp->rds.zst.ds = nullptr;
1822 }
1823 } else if (basep->file_type == kFileBgzf) {
1824 CleanupBgzfRawMtStream(&txsp->rds.bgzf);
1825 } else {
1826 // plain gzip
1827 if (txsp->rds.gz.in) {
1828 free(txsp->rds.gz.in);
1829 txsp->rds.gz.in = nullptr;
1830 }
1831 if (txsp->rds.gz.ds_initialized) {
1832 inflateEnd(&txsp->rds.gz.ds);
1833 }
1834 }
1835 basep->file_type = kFileUncompressed;
1836 }
1837 if (unlikely(fclose_null(&basep->ff))) {
1838 if (!reterrp) {
1839 return 1;
1840 }
1841 if (*reterrp == kPglRetSuccess) {
1842 // Note that we don't set basep->reterr or ->errmsg here.
1843 *reterrp = kPglRetReadFail;
1844 return 1;
1845 }
1846 }
1847 }
1848 return 0;
1849 }
1850
1851
TksNext(TokenStream * tksp,uint32_t shard_ct,char ** shard_boundaries)1852 PglErr TksNext(TokenStream* tksp, uint32_t shard_ct, char** shard_boundaries) {
1853 TextStreamMain* txsp = GetTxsp(&tksp->txs);
1854 txsp->base.consume_iter = txsp->base.consume_stop;
1855 PglErr reterr = TextAdvance(&(tksp->txs));
1856 if (reterr) { // not unlikely due to eof
1857 return reterr;
1858 }
1859 char* consume_iter = txsp->base.consume_iter;
1860 char* consume_stop = txsp->base.consume_stop;
1861 shard_boundaries[0] = consume_iter;
1862 shard_boundaries[shard_ct] = consume_stop;
1863 if (shard_ct > 1) {
1864 const uintptr_t shard_size_target = S_CAST(uintptr_t, consume_stop - consume_iter) / shard_ct;
1865 char* boundary_min = consume_iter;
1866 char* cur_boundary = consume_iter;
1867 for (uint32_t boundary_idx = 1; boundary_idx < shard_ct; ++boundary_idx) {
1868 boundary_min = &(boundary_min[shard_size_target]);
1869 if (boundary_min > cur_boundary) {
1870 // last character must be token separator
1871 cur_boundary = FirstSpaceOrEoln(boundary_min);
1872 ++cur_boundary;
1873 }
1874 shard_boundaries[boundary_idx] = cur_boundary;
1875 }
1876 }
1877 return kPglRetSuccess;
1878 }
1879
1880 #ifdef __cplusplus
1881 }
1882 #endif
1883