1 static char rcsid[] = "$Id: inbuffer.c 219869 2019-08-09 20:07:24Z twu $";
2 #ifdef HAVE_CONFIG_H
3 #include <config.h>
4 #endif
5
6 #include "inbuffer.h"
7 #include <stdio.h>
8 #include <stdlib.h>
9
10 #ifdef HAVE_PTHREAD
11 #ifdef HAVE_SYS_TYPES_H
12 #include <sys/types.h> /* Needed to define pthread_t on Solaris */
13 #endif
14 #include <pthread.h>
15 #endif
16
17 #include "mem.h"
18
19 #ifdef USE_MPI
20 #include "filestring.h"
21 #endif
22 #ifdef GSNAP
23 #include "shortread.h"
24 #endif
25
26
27 #ifdef DEBUG
28 #define debug(x) x
29 #else
30 #define debug(x)
31 #endif
32
33
34 static bool filter_if_both_p;
35
36 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
37 static MPI_Comm workers_comm;
38 #endif
39
40 #ifndef GSNAP
41 static bool user_pairalign_p;
42 static Sequence_T global_usersegment;
43 #endif
44
45 static int part_modulus;
46 static int part_interval;
47
48 void
Inbuffer_setup(bool filter_if_both_p_in,MPI_Comm workers_comm_in,bool user_pairalign_p_in,Sequence_T global_usersegment_in,int part_modulus_in,int part_interval_in)49 Inbuffer_setup (bool filter_if_both_p_in,
50 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
51 MPI_Comm workers_comm_in,
52 #endif
53 #ifndef GSNAP
54 bool user_pairalign_p_in, Sequence_T global_usersegment_in,
55 #endif
56
57 int part_modulus_in, int part_interval_in) {
58 filter_if_both_p = filter_if_both_p_in;
59
60 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
61 workers_comm = workers_comm_in;
62 #endif
63
64 #ifndef GSNAP
65 user_pairalign_p = user_pairalign_p_in;
66 global_usersegment = global_usersegment_in;
67 #endif
68
69 part_modulus = part_modulus_in;
70 part_interval = part_interval_in;
71
72 return;
73 }
74
75
76
77 #define T Inbuffer_T
78
79 struct T {
80 #ifdef USE_MPI
81 Master_T master;
82 #endif
83 Outbuffer_T outbuffer;
84
85 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
86 MPI_File input;
87 #ifdef GSNAP
88 MPI_File input2;
89 #endif
90
91 #elif (defined(USE_MPI))
92 FILE *input;
93 #ifdef GSNAP
94 FILE *input2;
95 #endif
96
97 #else
98 FILE *input;
99 #ifdef GSNAP
100 FILE *input2;
101 #endif
102 #endif
103
104 #ifdef USE_MPI
105 int myid;
106 char *filecontents1_alloc;
107 char *filecontents1;
108 char *filecontents2_alloc;
109 char *filecontents2;
110 #endif
111
112 #ifdef HAVE_ZLIB
113 gzFile gzipped;
114 gzFile gzipped2;
115 #else
116 void *gzipped;
117 void *gzipped2;
118 #endif
119
120 #ifdef HAVE_BZLIB
121 Bzip2_T bzipped;
122 Bzip2_T bzipped2;
123 #else
124 void *bzipped;
125 void *bzipped2;
126 #endif
127
128 char *read_files_command;
129 char **files;
130 int nfiles;
131 int nextchar;
132
133 #ifdef GSNAP
134 bool interleavedp;
135 #endif
136
137 #if defined(HAVE_PTHREAD)
138 pthread_mutex_t lock;
139 #endif
140
141 #ifndef GSNAP
142 Sequence_T pairalign_segment;
143 #endif
144 Request_T *buffer;
145 unsigned int nspaces;
146 int ptr;
147 int nleft;
148 int inputid;
149 int requestid;
150 };
151
152
153 #ifndef GSNAP
154 T
Inbuffer_cmdline(char * contents,int length)155 Inbuffer_cmdline (char *contents, int length) {
156 T new = (T) MALLOC(sizeof(*new));
157
158 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
159 new->input = (MPI_File) NULL;
160 #else
161 new->input = (FILE *) NULL;
162 #endif
163
164 #ifdef USE_MPI
165 new->filecontents1_alloc = (char *) NULL;
166 new->filecontents2_alloc = (char *) NULL;
167 #endif
168
169 new->files = (char **) NULL;
170 new->nfiles = 0;
171 new->nextchar = '\0';
172 /* new->interleavedp = false; --not in GMAP */
173
174 new->pairalign_segment = (Sequence_T) NULL;
175 new->buffer = (Request_T *) CALLOC(1,sizeof(Request_T));
176
177 new->ptr = 0;
178 new->nleft = 1;
179 new->inputid = 0;
180 new->requestid = 0;
181
182 new->buffer[0] = Request_new(new->requestid++,Sequence_genomic_new(contents,length,/*copyp*/true));
183
184 #if defined(HAVE_PTHREAD)
185 pthread_mutex_init(&new->lock,NULL);
186 #endif
187
188 return new;
189 }
190 #endif
191
192
193 T
Inbuffer_new(int nextchar,int myid,MPI_File input,MPI_File input2,gzFile gzipped,gzFile gzipped2,Bzip2_T bzipped,Bzip2_T bzipped2,bool interleavedp,char * read_files_command,char ** files,int nfiles,unsigned int nspaces)194 Inbuffer_new (int nextchar,
195 #ifdef USE_MPI
196 int myid,
197 #endif
198 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
199 MPI_File input,
200 #else
201 FILE *input,
202 #endif
203
204 #ifdef GSNAP
205 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
206 MPI_File input2,
207 #else
208 FILE *input2,
209 #endif
210 #ifdef HAVE_ZLIB
211 gzFile gzipped, gzFile gzipped2,
212 #endif
213 #ifdef HAVE_BZLIB
214 Bzip2_T bzipped, Bzip2_T bzipped2,
215 #endif
216 bool interleavedp,
217 #endif
218 char *read_files_command, char **files, int nfiles, unsigned int nspaces) {
219
220 T new = (T) MALLOC(sizeof(*new));
221
222 #ifdef USE_MPI
223 new->myid = myid;
224 #endif
225
226 new->input = input;
227 #ifdef GSNAP
228 new->input2 = input2;
229 #ifdef HAVE_ZLIB
230 new->gzipped = gzipped;
231 new->gzipped2 = gzipped2;
232 #else
233 new->gzipped = (void *) NULL;
234 new->gzipped2 = (void *) NULL;
235 #endif
236 #ifdef HAVE_BZLIB
237 new->bzipped = bzipped;
238 new->bzipped2 = bzipped2;
239 #else
240 new->bzipped = (void *) NULL;
241 new->bzipped2 = (void *) NULL;
242 #endif
243 new->interleavedp = interleavedp;
244 #endif
245
246 #ifdef USE_MPI
247 new->filecontents1_alloc = (char *) NULL;
248 new->filecontents2_alloc = (char *) NULL;
249 #endif
250
251 new->read_files_command = read_files_command;
252 new->files = files;
253 new->nfiles = nfiles;
254 new->nextchar = nextchar;
255
256 #if defined(HAVE_PTHREAD)
257 pthread_mutex_init(&new->lock,NULL);
258 #endif
259
260 #ifndef GSNAP
261 new->pairalign_segment = (Sequence_T) NULL;
262 #endif
263 new->buffer = (Request_T *) CALLOC(nspaces,sizeof(Request_T));
264 new->nspaces = nspaces;
265 new->ptr = 0;
266 new->nleft = 0;
267 new->inputid = 0;
268 new->requestid = 0;
269
270 return new;
271 }
272
273 #ifdef USE_MPI
274 void
Inbuffer_set_master(T this,Master_T master)275 Inbuffer_set_master (T this, Master_T master) {
276 this->master = master;
277 return;
278 }
279 #endif
280
281 void
Inbuffer_set_outbuffer(T this,Outbuffer_T outbuffer)282 Inbuffer_set_outbuffer (T this, Outbuffer_T outbuffer) {
283 this->outbuffer = outbuffer;
284 return;
285 }
286
287 void
Inbuffer_free(T * old)288 Inbuffer_free (T *old) {
289 if (*old) {
290 /* No need to close input, since done by Shortread and Sequence read procedures */
291
292 #ifdef USE_MPI
293 FREE_IN((*old)->filecontents1_alloc);
294 FREE_IN((*old)->filecontents2_alloc);
295 #endif
296
297 FREE((*old)->buffer);
298
299 #if defined(HAVE_PTHREAD)
300 pthread_mutex_destroy(&(*old)->lock);
301 #endif
302
303 FREE(*old);
304 }
305 return;
306 }
307
308
309 #ifndef GSNAP
310 /* Can delete when we remove worker_mpi_process from gmap.c */
311 Sequence_T
Inbuffer_read(Sequence_T * pairalign_segment,T this,bool skipp)312 Inbuffer_read (Sequence_T *pairalign_segment, T this, bool skipp) {
313 Sequence_T queryseq;
314
315 debug(printf("Calling Inbuffer_read\n"));
316
317 queryseq = Sequence_read_multifile(&this->nextchar,&this->input,this->read_files_command,&this->files,&this->nfiles);
318 if (skipp == true) {
319 Sequence_free(&queryseq);
320 }
321
322 if (user_pairalign_p == true) {
323 /* assert(this->nspaces == 1) */
324 if (this->pairalign_segment != NULL) {
325 Sequence_free(&this->pairalign_segment);
326 }
327 this->pairalign_segment = Sequence_read_unlimited(&this->nextchar,stdin);
328 debug(printf(" but first reading usersegment, got nextchar %c\n",this->nextchar));
329 }
330
331 this->inputid++;
332
333 *pairalign_segment = this->pairalign_segment;
334 return queryseq;
335 }
336
337 #endif
338
339
340 #ifdef USE_MPI
341 /* Used by rank 0 to communicate with Master_parser thread of rank 0 */
342 /* Returns number of requests read */
343 static unsigned int
fill_buffer_master(T this)344 fill_buffer_master (T this) {
345 unsigned int nread = 0;
346 Shortread_T queryseq1, queryseq2;
347 Filestring_T filestring1, filestring2;
348 bool skipp;
349 #if defined(USE_MPI_FILE_INPUT)
350 MPI_Status status;
351 #endif
352
353 int strlength1, strlength2;
354 int offset_start_1, offset_end_1, offset_start_2, offset_end_2;
355 int nextchar_end;
356 bool donep;
357 #if 0
358 int nchars1, nchars2; /* Doesn't need to be saved as a field in Inbuffer_T. */
359 #endif
360
361 /* Need to receive nextchar_end because of the difference between
362 filecontents end ('\0') and FILE * end (EOF) */
363
364 debug(fprintf(stdout,"Worker %d: accessing parser thread directly. ",this->myid));
365 Master_self_interface(this->master,&this->nextchar,&nextchar_end,
366 &offset_start_1,&offset_start_2,&offset_end_1,&offset_end_2,
367 &filestring1,&filestring2,&donep);
368
369 #if defined(HAVE_ZLIB) && defined(HAVE_BZLIB)
370 if (this->gzipped == NULL && this->bzipped == NULL) {
371 debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
372 offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
373
374 FREE_IN(this->filecontents1_alloc);
375 FREE_IN(this->filecontents2_alloc);
376 this->filecontents1 = (char *) NULL;
377 this->filecontents2 = (char *) NULL;
378
379 } else {
380 this->filecontents1 = this->filecontents1_alloc = Filestring_extract(&strlength1,filestring1);
381 this->filecontents2 = this->filecontents2_alloc = Filestring_extract(&strlength2,filestring2);
382 debug(fprintf(stdout,"Received filestrings of length %d and %d\n",strlength1,strlength2));
383 }
384
385 #elif defined(HAVE_ZLIB)
386 if (this->gzipped == NULL) {
387 debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
388 offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
389
390 FREE_IN(this->filecontents1_alloc);
391 FREE_IN(this->filecontents2_alloc);
392 this->filecontents1 = (char *) NULL;
393 this->filecontents2 = (char *) NULL;
394
395 } else {
396 this->filecontents1 = this->filecontents1_alloc = Filestring_extract(&strlength1,filestring1);
397 this->filecontents2 = this->filecontents2_alloc = Filestring_extract(&strlength2,filestring2);
398 debug(fprintf(stdout,"Received filestrings of length %d and %d\n",strlength1,strlength2));
399 }
400
401 #elif defined(HAVE_BZLIB)
402 if (this->bzipped == NULL) {
403 debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
404 offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
405
406 FREE_IN(this->filecontents1_alloc);
407 FREE_IN(this->filecontents2_alloc);
408 this->filecontents1 = (char *) NULL;
409 this->filecontents2 = (char *) NULL;
410
411 } else {
412 this->filecontents1 = this->filecontents1_alloc = Filestring_extract(&strlength1,filestring1);
413 this->filecontents2 = this->filecontents2_alloc = Filestring_extract(&strlength2,filestring2);
414 debug(fprintf(stdout,"Received filestrings of length %d and %d\n",strlength1,strlength2));
415 }
416
417 #else
418 debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
419 offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
420
421 FREE_IN(this->filecontents1_alloc);
422 FREE_IN(this->filecontents2_alloc);
423 this->filecontents1 = (char *) NULL;
424 this->filecontents2 = (char *) NULL;
425 #endif
426
427 Filestring_free(&filestring2);
428 Filestring_free(&filestring1);
429
430
431 if (this->filecontents1 == NULL) {
432 #if defined(USE_MPI_FILE_INPUT)
433 MPI_File_seek(this->input,offset_start_1,MPI_SEEK_SET);
434 this->filecontents1 = this->filecontents1_alloc = (char *) MALLOC_IN((offset_end_1 - offset_start_1 + 1) * sizeof(char));
435 MPI_File_read(this->input,this->filecontents1,offset_end_1 - offset_start_1,MPI_CHAR,&status);
436 this->filecontents1[offset_end_1 - offset_start_1] = '\0';
437
438 if (this->input2 != NULL) {
439 MPI_File_seek(this->input2,offset_start_2,MPI_SEEK_SET);
440 this->filecontents2 = this->filecontents2_alloc = (char *) MALLOC_IN((offset_end_2 - offset_start_2 + 1) * sizeof(char));
441 MPI_File_read(this->input2,this->filecontents2,offset_end_2 - offset_start_2,MPI_CHAR,&status);
442 this->filecontents2[offset_end_2 - offset_start_2] = '\0';
443 }
444
445 #else
446 #ifdef HAVE_FSEEKO
447 fseeko(this->input,offset_start_1,SEEK_SET);
448 #else
449 fseek(this->input,offset_start_1,SEEK_SET);
450 #endif
451 this->filecontents1 = this->filecontents1_alloc = (char *) MALLOC_IN((offset_end_1 - offset_start_1 + 1) * sizeof(char));
452 fread(this->filecontents1,offset_end_1 - offset_start_1,sizeof(char),this->input);
453 this->filecontents1[offset_end_1 - offset_start_1] = '\0';
454 if (this->input2 != NULL) {
455 #ifdef HAVE_FSEEKO
456 fseeko(this->input2,offset_start_2,SEEK_SET);
457 #else
458 fseek(this->input2,offset_start_2,SEEK_SET);
459 #endif
460 this->filecontents2 = this->filecontents2_alloc = (char *) MALLOC_IN((offset_end_2 - offset_start_2 + 2) * sizeof(char));
461 fread(this->filecontents2,offset_end_2 - offset_start_2,sizeof(char),this->input);
462 this->filecontents2[offset_end_2 - offset_start_2] = '\0';
463 }
464 #endif
465 }
466
467 /* Read from filecontents */
468 while (nread < this->nspaces &&
469 (queryseq1 = Shortread_read_filecontents(&this->nextchar,&queryseq2,
470 &this->filecontents1,&this->filecontents2,&this->input,&this->input2,
471 #ifdef USE_MPI_FILE_INPUT
472 workers_comm,
473 #endif
474 this->interleavedp,
475 this->read_files_command,&this->files,&this->nfiles,
476 skipp = (this->inputid % part_interval != part_modulus))) != NULL) {
477 if (skipp) {
478 #if 0
479 /* Shortread procedures won't allocate in this situation */
480 Shortread_free(&queryseq1);
481 if (queryseq2 != NULL) {
482 Shortread_free(&queryseq2);
483 }
484 #endif
485
486 } else if (filter_if_both_p == true &&
487 Shortread_filterp(queryseq1) == true && (queryseq2 == NULL || Shortread_filterp(queryseq2) == true)) {
488 Shortread_free(&queryseq1);
489 if (queryseq2 != NULL) {
490 Shortread_free(&queryseq2);
491 }
492
493 } else if (filter_if_both_p == false &&
494 (Shortread_filterp(queryseq1) == true || (queryseq2 != NULL && Shortread_filterp(queryseq2) == true))) {
495 Shortread_free(&queryseq1);
496 if (queryseq2 != NULL) {
497 Shortread_free(&queryseq2);
498 }
499
500 } else {
501 this->buffer[nread++] = Request_new(this->requestid++,queryseq1,queryseq2);
502 }
503 this->inputid++;
504 }
505
506 this->nleft = nread;
507 this->ptr = 0;
508
509 /* Need to set this to the FILE * end (EOF at end of file), and not the filecontents end (always '\0') */
510 this->nextchar = nextchar_end;
511
512 #ifdef USE_MPI
513 debug(printf("Worker %d: ",this->myid));
514 #endif
515 debug(printf("this->nextchar (nextchar_end) is %c (%d)\n",this->nextchar,this->nextchar));
516
517 return nread;
518 }
519
520
521
522 /* Used by ranks 1..n to communicate with Master_mpi_interface thread of rank 0 */
523 /* Returns number of requests read */
524 static unsigned int
fill_buffer_slave(T this)525 fill_buffer_slave (T this) {
526 unsigned int nread = 0;
527 Shortread_T queryseq1, queryseq2;
528 bool skipp, donep;
529
530 int strlength1, strlength2;
531 MPI_Status status;
532 int offset_start_1, offset_end_1, offset_start_2, offset_end_2;
533 int nextchar_end;
534 #if 0
535 int nchars1, nchars2; /* Doesn't need to be saved as a field in Inbuffer_T. */
536 #endif
537
538 /* Need to receive nextchar_end because of the difference between
539 filecontents end ('\0') and FILE * end (EOF) */
540
541 debug(fprintf(stdout,"Worker %d: sending notification to master process. ",this->myid));
542 MPI_SEND(&this->nfiles,1,MPI_INT,/*dest*/0,/*tag*/MPI_TAG_WANT_INPUT,MPI_COMM_WORLD);
543 MPI_RECV(&this->nextchar,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
544 MPI_RECV(&nextchar_end,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
545 MPI_RECV(&donep,1,MPI_UNSIGNED_CHAR,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
546
547 #if defined(HAVE_ZLIB) && defined(HAVE_BZLIB)
548 if (this->gzipped == NULL && this->bzipped == NULL) {
549 MPI_RECV(&offset_start_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
550 MPI_RECV(&offset_start_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
551 MPI_RECV(&offset_end_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
552 MPI_RECV(&offset_end_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
553 debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
554 offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
555
556 FREE_IN(this->filecontents1_alloc);
557 FREE_IN(this->filecontents2_alloc);
558 this->filecontents1 = (char *) NULL;
559 this->filecontents2 = (char *) NULL;
560
561 } else {
562 this->filecontents1 = this->filecontents1_alloc =
563 Filestring_recv(&strlength1,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD);
564 this->filecontents2 = this->filecontents2_alloc =
565 Filestring_recv(&strlength2,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD);
566 debug(fprintf(stdout,"Received filestrings of length %d and %d\n",strlength1,strlength2));
567 }
568
569 #elif defined(HAVE_ZLIB)
570 if (this->gzipped == NULL) {
571 MPI_RECV(&offset_start_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
572 MPI_RECV(&offset_start_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
573 MPI_RECV(&offset_end_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
574 MPI_RECV(&offset_end_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
575 debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
576 offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
577 debug(fprintf(stdout,"Received offsets %d..%d and %d..%d\n",offset_start_1,offset_end_1,offset_start_2,offset_end_2));
578
579 FREE_IN(this->filecontents1_alloc);
580 FREE_IN(this->filecontents2_alloc);
581 this->filecontents1 = (char *) NULL;
582 this->filecontents2 = (char *) NULL;
583
584 } else {
585 this->filecontents1 = this->filecontents1_alloc =
586 Filestring_recv(&strlength1,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD);
587 this->filecontents2 = this->filecontents2_alloc =
588 Filestring_recv(&strlength2,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD);
589 debug(fprintf(stdout,"Received filestrings of length %d and %d\n",strlength1,strlength2));
590 }
591
592 #elif defined(HAVE_BZLIB)
593 if (this->bzipped == NULL) {
594 MPI_RECV(&offset_start_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
595 MPI_RECV(&offset_start_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
596 MPI_RECV(&offset_end_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
597 MPI_RECV(&offset_end_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
598 debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
599 offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
600 debug(fprintf(stdout,"Received offsets %d..%d and %d..%d\n",offset_start_1,offset_end_1,offset_start_2,offset_end_2));
601
602 FREE_IN(this->filecontents1_alloc);
603 FREE_IN(this->filecontents2_alloc);
604 this->filecontents1 = (char *) NULL;
605 this->filecontents2 = (char *) NULL;
606
607 } else {
608 this->filecontents1 = this->filecontents1_alloc =
609 Filestring_recv(&strlength1,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD);
610 this->filecontents2 = this->filecontents2_alloc =
611 Filestring_recv(&strlength2,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD);
612 debug(fprintf(stdout,"Received filestrings of length %d and %d\n",strlength1,strlength2));
613 }
614
615 #else
616 MPI_RECV(&offset_start_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
617 MPI_RECV(&offset_start_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
618 MPI_RECV(&offset_end_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
619 MPI_RECV(&offset_end_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
620 debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
621 offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
622
623 FREE_IN(this->filecontents1_alloc);
624 FREE_IN(this->filecontents2_alloc);
625 this->filecontents1 = (char *) NULL;
626 this->filecontents2 = (char *) NULL;
627 #endif
628
629
630 if (this->filecontents1 == NULL) {
631 #if defined(USE_MPI_FILE_INPUT)
632 MPI_File_seek(this->input,offset_start_1,MPI_SEEK_SET);
633 this->filecontents1 = this->filecontents1_alloc = (char *) MALLOC_IN((offset_end_1 - offset_start_1 + 1) * sizeof(char));
634 MPI_File_read(this->input,this->filecontents1,offset_end_1 - offset_start_1,MPI_CHAR,&status);
635 this->filecontents1[offset_end_1 - offset_start_1] = '\0';
636
637 if (this->input2 != NULL) {
638 MPI_File_seek(this->input2,offset_start_2,MPI_SEEK_SET);
639 this->filecontents2 = this->filecontents2_alloc = (char *) MALLOC_IN((offset_end_2 - offset_start_2 + 1) * sizeof(char));
640 MPI_File_read(this->input2,this->filecontents2,offset_end_2 - offset_start_2,MPI_CHAR,&status);
641 this->filecontents2[offset_end_2 - offset_start_2] = '\0';
642 }
643
644 #else
645 #ifdef HAVE_FSEEKO
646 fseeko(this->input,offset_start_1,SEEK_SET);
647 #else
648 fseek(this->input,offset_start_1,SEEK_SET);
649 #endif
650 this->filecontents1 = this->filecontents1_alloc = (char *) MALLOC_IN((offset_end_1 - offset_start_1 + 1) * sizeof(char));
651 fread(this->filecontents1,offset_end_1 - offset_start_1,sizeof(char),this->input);
652 this->filecontents1[offset_end_1 - offset_start_1] = '\0';
653 if (this->input2 != NULL) {
654 #ifdef HAVE_FSEEKO
655 fseeko(this->input2,offset_start_2,SEEK_SET);
656 #else
657 fseek(this->input2,offset_start_2,SEEK_SET);
658 #endif
659 this->filecontents2 = this->filecontents2_alloc = (char *) MALLOC_IN((offset_end_2 - offset_start_2 + 2) * sizeof(char));
660 fread(this->filecontents2,offset_end_2 - offset_start_2,sizeof(char),this->input);
661 this->filecontents2[offset_end_2 - offset_start_2] = '\0';
662 }
663 #endif
664 }
665
666 /* Read from filecontents */
667 while (nread < this->nspaces &&
668 (queryseq1 = Shortread_read_filecontents(&this->nextchar,&queryseq2,
669 &this->filecontents1,&this->filecontents2,&this->input,&this->input2,
670 #ifdef USE_MPI_FILE_INPUT
671 workers_comm,
672 #endif
673 this->read_files_command,&this->files,&this->nfiles,
674 skipp = (this->inputid % part_interval != part_modulus))) != NULL) {
675 if (skipp) {
676 #if 0
677 /* Shortread procedures won't allocate in this situation */
678 Shortread_free(&queryseq1);
679 if (queryseq2 != NULL) {
680 Shortread_free(&queryseq2);
681 }
682 #endif
683
684 } else if (filter_if_both_p == true &&
685 Shortread_filterp(queryseq1) == true && (queryseq2 == NULL || Shortread_filterp(queryseq2) == true)) {
686 Shortread_free(&queryseq1);
687 if (queryseq2 != NULL) {
688 Shortread_free(&queryseq2);
689 }
690
691 } else if (filter_if_both_p == false &&
692 (Shortread_filterp(queryseq1) == true || (queryseq2 != NULL && Shortread_filterp(queryseq2) == true))) {
693 Shortread_free(&queryseq1);
694 if (queryseq2 != NULL) {
695 Shortread_free(&queryseq2);
696 }
697
698 } else {
699 this->buffer[nread++] = Request_new(this->requestid++,queryseq1,queryseq2);
700 }
701 this->inputid++;
702 }
703
704 this->nleft = nread;
705 this->ptr = 0;
706
707 /* Need to set this to the FILE * end (EOF at end of file), and not the filecontents end (always '\0') */
708 this->nextchar = nextchar_end;
709
710 #ifdef USE_MPI
711 debug(printf("Worker %d: ",this->myid));
712 #endif
713 debug(printf("this->nextchar (nextchar_end) is %c (%d)\n",this->nextchar,this->nextchar));
714
715 return nread;
716 }
717
718 #elif defined(GSNAP)
719
720 /* Returns number of requests read */
721 static unsigned int
fill_buffer(T this)722 fill_buffer (T this) {
723 unsigned int nread = 0;
724 Shortread_T queryseq1, queryseq2;
725 bool skipp;
726 int nchars1 = 0, nchars2 = 0; /* Returned only because MPI master needs it. Doesn't need to be saved as a field in Inbuffer_T. */
727
728 /* fprintf(stderr,"Entered fill_buffer\n"); */
729 while (nread < this->nspaces &&
730 (queryseq1 = Shortread_read(&this->nextchar,&nchars1,&nchars2,&queryseq2,
731 &this->input,&this->input2,
732 #ifdef HAVE_ZLIB
733 &this->gzipped,&this->gzipped2,
734 #endif
735 #ifdef HAVE_BZLIB
736 &this->bzipped,&this->bzipped2,
737 #endif
738 this->interleavedp,
739 this->read_files_command,&this->files,&this->nfiles,
740 skipp = (this->inputid % part_interval != part_modulus))) != NULL) {
741 if (skipp) {
742 #if 0
743 /* Shortread procedures won't allocate in this situation */
744 Shortread_free(&queryseq1);
745 if (queryseq2 != NULL) {
746 Shortread_free(&queryseq2);
747 }
748 #endif
749
750 } else if (filter_if_both_p == true &&
751 Shortread_filterp(queryseq1) == true && (queryseq2 == NULL || Shortread_filterp(queryseq2) == true)) {
752 Shortread_free(&queryseq1);
753 if (queryseq2 != NULL) {
754 Shortread_free(&queryseq2);
755 }
756
757 } else if (filter_if_both_p == false &&
758 (Shortread_filterp(queryseq1) == true || (queryseq2 != NULL && Shortread_filterp(queryseq2) == true))) {
759 Shortread_free(&queryseq1);
760 if (queryseq2 != NULL) {
761 Shortread_free(&queryseq2);
762 }
763
764 } else {
765 this->buffer[nread++] = Request_new(this->requestid++,queryseq1,queryseq2);
766 }
767 this->inputid++;
768 }
769 /* fprintf(stderr,"Read %d reads\n",nread); */
770
771 this->nleft = nread;
772 this->ptr = 0;
773
774 return nread;
775 }
776
777 #else
778
779 /* GMAP version */
780 /* Returns number of requests read */
781 static unsigned int
fill_buffer(T this)782 fill_buffer (T this) {
783 unsigned int nread = 0;
784 #if 0
785 unsigned int nchars = 0U;
786 #endif
787 Sequence_T queryseq;
788
789 while (nread < this->nspaces &&
790 (queryseq = Sequence_read_multifile(&this->nextchar,&this->input,
791 this->read_files_command,&this->files,&this->nfiles)) != NULL) {
792 if (this->inputid % part_interval != part_modulus) {
793 Sequence_free(&queryseq);
794 } else {
795 debug(printf("inbuffer creating request %d\n",this->requestid));
796 this->buffer[nread++] = Request_new(this->requestid++,queryseq);
797 #if 0
798 nchars += Sequence_fulllength(queryseq);
799 #endif
800 }
801 this->inputid++;
802 }
803
804 this->nleft = nread;
805 this->ptr = 0;
806
807 return nread;
808 }
809
810 #endif
811
812
813 #ifndef USE_MPI
814 /* No need to lock, since only main thread calls */
815 /* Returns nread to give to Outbuffer_new */
816 unsigned int
Inbuffer_fill_init(T this)817 Inbuffer_fill_init (T this) {
818 unsigned int nread;
819
820 debug(printf("inbuffer filling initially\n"));
821 nread = fill_buffer(this);
822 debug(printf("inbuffer read %d sequences\n",nread));
823
824 return nread;
825 }
826 #endif
827
828
829 Request_T
830 #ifdef GSNAP
Inbuffer_get_request(T this)831 Inbuffer_get_request (T this)
832 #else
833 Inbuffer_get_request (Sequence_T *pairalign_segment, T this)
834 #endif
835 {
836 Request_T request;
837 unsigned int nread;
838
839 debug(printf("Calling Inbuffer_get_request\n"));
840
841 #if defined(HAVE_PTHREAD)
842 pthread_mutex_lock(&this->lock);
843 #endif
844
845 if (this->nleft > 0) {
846 request = this->buffer[this->ptr++];
847 this->nleft -= 1;
848
849 #if 0
850 } else if (this->nextchar == EOF) {
851 /* Causes --force-single-end to fail when reads in a file are a multiple of nspaces */
852 /* Want to call fill_buffer to find out if the input is exhausted */
853 Outbuffer_add_nread(this->outbuffer,/*nread*/0);
854 request = NULL;
855 #endif
856
857 } else {
858 #ifdef USE_MPI
859 debug(printf("Worker %d: ",this->myid));
860 #endif
861 debug(printf("inbuffer filling with nextchar %c (%d)\n",this->nextchar,this->nextchar));
862
863 #ifndef GSNAP
864 if (user_pairalign_p == true) {
865 /* assert(this->nspaces == 1) */
866 if (this->pairalign_segment != NULL) {
867 Sequence_free(&this->pairalign_segment);
868 }
869 this->pairalign_segment = Sequence_read_unlimited(&this->nextchar,stdin);
870 debug(printf(" but first reading usersegment, got nextchar %c\n",this->nextchar));
871 }
872 #endif
873
874 #ifdef USE_MPI
875 if (this->myid == 0) {
876 nread = fill_buffer_master(this);
877 } else {
878 nread = fill_buffer_slave(this);
879 }
880 #else
881 nread = fill_buffer(this);
882 #endif
883
884 Outbuffer_add_nread(this->outbuffer,nread);
885 #ifdef USE_MPI
886 debug(printf("Worker %d: ",this->myid));
887 #endif
888 debug(printf("inbuffer read %d sequences\n",nread));
889
890 if (nread == 0) {
891 /* Still empty */
892 request = NULL;
893 } else {
894 request = this->buffer[this->ptr++];
895 this->nleft -= 1;
896 }
897 }
898
899 #ifndef GSNAP
900 *pairalign_segment = this->pairalign_segment;
901 #endif
902
903 #if defined(HAVE_PTHREAD)
904 pthread_mutex_unlock(&this->lock);
905 #endif
906
907 return request;
908 }
909
910
911 #ifndef GSNAP
912 /* Same as Inbuffer_get_request, but leaves sequence in buffer. Used by GMAP for selfalign feature. */
913 Request_T
Inbuffer_first_request(T this)914 Inbuffer_first_request (T this) {
915 Request_T request;
916 unsigned int nread;
917
918 debug(printf("Calling Inbuffer_first_request\n"));
919
920 #if defined(HAVE_PTHREAD)
921 pthread_mutex_lock(&this->lock);
922 #endif
923
924 if (this->nleft > 0) {
925 request = this->buffer[this->ptr/*++*/];
926 /* this->nleft -= 1; */
927
928 } else {
929 debug(printf("inbuffer filling\n"));
930 nread = fill_buffer(this);
931 Outbuffer_add_nread(this->outbuffer,nread);
932 debug(printf("inbuffer read %d sequences\n",nread));
933
934 if (nread == 0) {
935 /* Still empty */
936 request = NULL;
937 } else {
938 request = this->buffer[this->ptr/*++*/];
939 /* this->nleft -= 1; */
940 }
941 }
942
943 #if defined(HAVE_PTHREAD)
944 pthread_mutex_unlock(&this->lock);
945 #endif
946
947 return request;
948 }
949 #endif
950
951
952