1 static char rcsid[] = "$Id: inbuffer.c 219869 2019-08-09 20:07:24Z twu $";
2 #ifdef HAVE_CONFIG_H
3 #include <config.h>
4 #endif
5 
6 #include "inbuffer.h"
7 #include <stdio.h>
8 #include <stdlib.h>
9 
10 #ifdef HAVE_PTHREAD
11 #ifdef HAVE_SYS_TYPES_H
12 #include <sys/types.h>		/* Needed to define pthread_t on Solaris */
13 #endif
14 #include <pthread.h>
15 #endif
16 
17 #include "mem.h"
18 
19 #ifdef USE_MPI
20 #include "filestring.h"
21 #endif
22 #ifdef GSNAP
23 #include "shortread.h"
24 #endif
25 
26 
27 #ifdef DEBUG
28 #define debug(x) x
29 #else
30 #define debug(x)
31 #endif
32 
33 
34 static bool filter_if_both_p;
35 
36 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
37 static MPI_Comm workers_comm;
38 #endif
39 
40 #ifndef GSNAP
41 static bool user_pairalign_p;
42 static Sequence_T global_usersegment;
43 #endif
44 
45 static int part_modulus;
46 static int part_interval;
47 
48 void
Inbuffer_setup(bool filter_if_both_p_in,MPI_Comm workers_comm_in,bool user_pairalign_p_in,Sequence_T global_usersegment_in,int part_modulus_in,int part_interval_in)49 Inbuffer_setup (bool filter_if_both_p_in,
50 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
51 		MPI_Comm workers_comm_in,
52 #endif
53 #ifndef GSNAP
54 		bool user_pairalign_p_in, Sequence_T global_usersegment_in,
55 #endif
56 
57 		int part_modulus_in, int part_interval_in) {
58   filter_if_both_p = filter_if_both_p_in;
59 
60 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
61   workers_comm = workers_comm_in;
62 #endif
63 
64 #ifndef GSNAP
65   user_pairalign_p = user_pairalign_p_in;
66   global_usersegment = global_usersegment_in;
67 #endif
68 
69   part_modulus = part_modulus_in;
70   part_interval = part_interval_in;
71 
72   return;
73 }
74 
75 
76 
77 #define T Inbuffer_T
78 
79 struct T {
80 #ifdef USE_MPI
81   Master_T master;
82 #endif
83   Outbuffer_T outbuffer;
84 
85 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
86   MPI_File input;
87 #ifdef GSNAP
88   MPI_File input2;
89 #endif
90 
91 #elif (defined(USE_MPI))
92   FILE *input;
93 #ifdef GSNAP
94   FILE *input2;
95 #endif
96 
97 #else
98   FILE *input;
99 #ifdef GSNAP
100   FILE *input2;
101 #endif
102 #endif
103 
104 #ifdef USE_MPI
105   int myid;
106   char *filecontents1_alloc;
107   char *filecontents1;
108   char *filecontents2_alloc;
109   char *filecontents2;
110 #endif
111 
112 #ifdef HAVE_ZLIB
113   gzFile gzipped;
114   gzFile gzipped2;
115 #else
116   void *gzipped;
117   void *gzipped2;
118 #endif
119 
120 #ifdef HAVE_BZLIB
121   Bzip2_T bzipped;
122   Bzip2_T bzipped2;
123 #else
124   void *bzipped;
125   void *bzipped2;
126 #endif
127 
128   char *read_files_command;
129   char **files;
130   int nfiles;
131   int nextchar;
132 
133 #ifdef GSNAP
134   bool interleavedp;
135 #endif
136 
137 #if defined(HAVE_PTHREAD)
138   pthread_mutex_t lock;
139 #endif
140 
141 #ifndef GSNAP
142   Sequence_T pairalign_segment;
143 #endif
144   Request_T *buffer;
145   unsigned int nspaces;
146   int ptr;
147   int nleft;
148   int inputid;
149   int requestid;
150 };
151 
152 
153 #ifndef GSNAP
154 T
Inbuffer_cmdline(char * contents,int length)155 Inbuffer_cmdline (char *contents, int length) {
156   T new = (T) MALLOC(sizeof(*new));
157 
158 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
159   new->input = (MPI_File) NULL;
160 #else
161   new->input = (FILE *) NULL;
162 #endif
163 
164 #ifdef USE_MPI
165   new->filecontents1_alloc = (char *) NULL;
166   new->filecontents2_alloc = (char *) NULL;
167 #endif
168 
169   new->files = (char **) NULL;
170   new->nfiles = 0;
171   new->nextchar = '\0';
172   /* new->interleavedp = false; --not in GMAP */
173 
174   new->pairalign_segment = (Sequence_T) NULL;
175   new->buffer = (Request_T *) CALLOC(1,sizeof(Request_T));
176 
177   new->ptr = 0;
178   new->nleft = 1;
179   new->inputid = 0;
180   new->requestid = 0;
181 
182   new->buffer[0] = Request_new(new->requestid++,Sequence_genomic_new(contents,length,/*copyp*/true));
183 
184 #if defined(HAVE_PTHREAD)
185   pthread_mutex_init(&new->lock,NULL);
186 #endif
187 
188   return new;
189 }
190 #endif
191 
192 
193 T
Inbuffer_new(int nextchar,int myid,MPI_File input,MPI_File input2,gzFile gzipped,gzFile gzipped2,Bzip2_T bzipped,Bzip2_T bzipped2,bool interleavedp,char * read_files_command,char ** files,int nfiles,unsigned int nspaces)194 Inbuffer_new (int nextchar,
195 #ifdef USE_MPI
196 	      int myid,
197 #endif
198 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
199 	      MPI_File input,
200 #else
201 	      FILE *input,
202 #endif
203 
204 #ifdef GSNAP
205 #if defined(USE_MPI) && defined(USE_MPI_FILE_INPUT)
206 	      MPI_File input2,
207 #else
208 	      FILE *input2,
209 #endif
210 #ifdef HAVE_ZLIB
211 	      gzFile gzipped, gzFile gzipped2,
212 #endif
213 #ifdef HAVE_BZLIB
214 	      Bzip2_T bzipped, Bzip2_T bzipped2,
215 #endif
216 	      bool interleavedp,
217 #endif
218 	      char *read_files_command, char **files, int nfiles, unsigned int nspaces) {
219 
220   T new = (T) MALLOC(sizeof(*new));
221 
222 #ifdef USE_MPI
223   new->myid = myid;
224 #endif
225 
226   new->input = input;
227 #ifdef GSNAP
228   new->input2 = input2;
229 #ifdef HAVE_ZLIB
230   new->gzipped = gzipped;
231   new->gzipped2 = gzipped2;
232 #else
233   new->gzipped = (void *) NULL;
234   new->gzipped2 = (void *) NULL;
235 #endif
236 #ifdef HAVE_BZLIB
237   new->bzipped = bzipped;
238   new->bzipped2 = bzipped2;
239 #else
240   new->bzipped = (void *) NULL;
241   new->bzipped2 = (void *) NULL;
242 #endif
243   new->interleavedp = interleavedp;
244 #endif
245 
246 #ifdef USE_MPI
247   new->filecontents1_alloc = (char *) NULL;
248   new->filecontents2_alloc = (char *) NULL;
249 #endif
250 
251   new->read_files_command = read_files_command;
252   new->files = files;
253   new->nfiles = nfiles;
254   new->nextchar = nextchar;
255 
256 #if defined(HAVE_PTHREAD)
257   pthread_mutex_init(&new->lock,NULL);
258 #endif
259 
260 #ifndef GSNAP
261   new->pairalign_segment = (Sequence_T) NULL;
262 #endif
263   new->buffer = (Request_T *) CALLOC(nspaces,sizeof(Request_T));
264   new->nspaces = nspaces;
265   new->ptr = 0;
266   new->nleft = 0;
267   new->inputid = 0;
268   new->requestid = 0;
269 
270   return new;
271 }
272 
273 #ifdef USE_MPI
274 void
Inbuffer_set_master(T this,Master_T master)275 Inbuffer_set_master (T this, Master_T master) {
276   this->master = master;
277   return;
278 }
279 #endif
280 
281 void
Inbuffer_set_outbuffer(T this,Outbuffer_T outbuffer)282 Inbuffer_set_outbuffer (T this, Outbuffer_T outbuffer) {
283   this->outbuffer = outbuffer;
284   return;
285 }
286 
287 void
Inbuffer_free(T * old)288 Inbuffer_free (T *old) {
289   if (*old) {
290     /* No need to close input, since done by Shortread and Sequence read procedures */
291 
292 #ifdef USE_MPI
293     FREE_IN((*old)->filecontents1_alloc);
294     FREE_IN((*old)->filecontents2_alloc);
295 #endif
296 
297     FREE((*old)->buffer);
298 
299 #if defined(HAVE_PTHREAD)
300     pthread_mutex_destroy(&(*old)->lock);
301 #endif
302 
303     FREE(*old);
304   }
305   return;
306 }
307 
308 
309 #ifndef GSNAP
310 /* Can delete when we remove worker_mpi_process from gmap.c */
311 Sequence_T
Inbuffer_read(Sequence_T * pairalign_segment,T this,bool skipp)312 Inbuffer_read (Sequence_T *pairalign_segment, T this, bool skipp) {
313   Sequence_T queryseq;
314 
315   debug(printf("Calling Inbuffer_read\n"));
316 
317   queryseq = Sequence_read_multifile(&this->nextchar,&this->input,this->read_files_command,&this->files,&this->nfiles);
318   if (skipp == true) {
319     Sequence_free(&queryseq);
320   }
321 
322   if (user_pairalign_p == true) {
323     /* assert(this->nspaces == 1) */
324     if (this->pairalign_segment != NULL) {
325       Sequence_free(&this->pairalign_segment);
326     }
327     this->pairalign_segment = Sequence_read_unlimited(&this->nextchar,stdin);
328     debug(printf("  but first reading usersegment, got nextchar %c\n",this->nextchar));
329   }
330 
331   this->inputid++;
332 
333   *pairalign_segment = this->pairalign_segment;
334   return queryseq;
335 }
336 
337 #endif
338 
339 
340 #ifdef USE_MPI
341 /* Used by rank 0 to communicate with Master_parser thread of rank 0 */
342 /* Returns number of requests read */
343 static unsigned int
fill_buffer_master(T this)344 fill_buffer_master (T this) {
345   unsigned int nread = 0;
346   Shortread_T queryseq1, queryseq2;
347   Filestring_T filestring1, filestring2;
348   bool skipp;
349 #if defined(USE_MPI_FILE_INPUT)
350   MPI_Status status;
351 #endif
352 
353   int strlength1, strlength2;
354   int offset_start_1, offset_end_1, offset_start_2, offset_end_2;
355   int nextchar_end;
356   bool donep;
357 #if 0
358   int nchars1, nchars2;		/* Doesn't need to be saved as a field in Inbuffer_T. */
359 #endif
360 
361   /* Need to receive nextchar_end because of the difference between
362      filecontents end ('\0') and FILE * end (EOF) */
363 
364   debug(fprintf(stdout,"Worker %d: accessing parser thread directly.  ",this->myid));
365   Master_self_interface(this->master,&this->nextchar,&nextchar_end,
366 			&offset_start_1,&offset_start_2,&offset_end_1,&offset_end_2,
367 			&filestring1,&filestring2,&donep);
368 
369 #if defined(HAVE_ZLIB) && defined(HAVE_BZLIB)
370   if (this->gzipped == NULL && this->bzipped == NULL) {
371     debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
372 		  offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
373 
374     FREE_IN(this->filecontents1_alloc);
375     FREE_IN(this->filecontents2_alloc);
376     this->filecontents1 = (char *) NULL;
377     this->filecontents2 = (char *) NULL;
378 
379   } else {
380     this->filecontents1 = this->filecontents1_alloc = Filestring_extract(&strlength1,filestring1);
381     this->filecontents2 = this->filecontents2_alloc = Filestring_extract(&strlength2,filestring2);
382     debug(fprintf(stdout,"Received filestrings of length %d and %d\n",strlength1,strlength2));
383   }
384 
385 #elif defined(HAVE_ZLIB)
386   if (this->gzipped == NULL) {
387     debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
388 		  offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
389 
390     FREE_IN(this->filecontents1_alloc);
391     FREE_IN(this->filecontents2_alloc);
392     this->filecontents1 = (char *) NULL;
393     this->filecontents2 = (char *) NULL;
394 
395   } else {
396     this->filecontents1 = this->filecontents1_alloc = Filestring_extract(&strlength1,filestring1);
397     this->filecontents2 = this->filecontents2_alloc = Filestring_extract(&strlength2,filestring2);
398     debug(fprintf(stdout,"Received filestrings of length %d and %d\n",strlength1,strlength2));
399   }
400 
401 #elif defined(HAVE_BZLIB)
402   if (this->bzipped == NULL) {
403     debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
404 		  offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
405 
406     FREE_IN(this->filecontents1_alloc);
407     FREE_IN(this->filecontents2_alloc);
408     this->filecontents1 = (char *) NULL;
409     this->filecontents2 = (char *) NULL;
410 
411   } else {
412     this->filecontents1 = this->filecontents1_alloc = Filestring_extract(&strlength1,filestring1);
413     this->filecontents2 = this->filecontents2_alloc = Filestring_extract(&strlength2,filestring2);
414     debug(fprintf(stdout,"Received filestrings of length %d and %d\n",strlength1,strlength2));
415   }
416 
417 #else
418   debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
419 		offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
420 
421   FREE_IN(this->filecontents1_alloc);
422   FREE_IN(this->filecontents2_alloc);
423   this->filecontents1 = (char *) NULL;
424   this->filecontents2 = (char *) NULL;
425 #endif
426 
427   Filestring_free(&filestring2);
428   Filestring_free(&filestring1);
429 
430 
431   if (this->filecontents1 == NULL) {
432 #if defined(USE_MPI_FILE_INPUT)
433     MPI_File_seek(this->input,offset_start_1,MPI_SEEK_SET);
434     this->filecontents1 = this->filecontents1_alloc = (char *) MALLOC_IN((offset_end_1 - offset_start_1 + 1) * sizeof(char));
435     MPI_File_read(this->input,this->filecontents1,offset_end_1 - offset_start_1,MPI_CHAR,&status);
436     this->filecontents1[offset_end_1 - offset_start_1] = '\0';
437 
438     if (this->input2 != NULL) {
439       MPI_File_seek(this->input2,offset_start_2,MPI_SEEK_SET);
440       this->filecontents2 = this->filecontents2_alloc = (char *) MALLOC_IN((offset_end_2 - offset_start_2 + 1) * sizeof(char));
441       MPI_File_read(this->input2,this->filecontents2,offset_end_2 - offset_start_2,MPI_CHAR,&status);
442       this->filecontents2[offset_end_2 - offset_start_2] = '\0';
443     }
444 
445 #else
446 #ifdef HAVE_FSEEKO
447     fseeko(this->input,offset_start_1,SEEK_SET);
448 #else
449     fseek(this->input,offset_start_1,SEEK_SET);
450 #endif
451     this->filecontents1 = this->filecontents1_alloc = (char *) MALLOC_IN((offset_end_1 - offset_start_1 + 1) * sizeof(char));
452     fread(this->filecontents1,offset_end_1 - offset_start_1,sizeof(char),this->input);
453     this->filecontents1[offset_end_1 - offset_start_1] = '\0';
454     if (this->input2 != NULL) {
455 #ifdef HAVE_FSEEKO
456       fseeko(this->input2,offset_start_2,SEEK_SET);
457 #else
458       fseek(this->input2,offset_start_2,SEEK_SET);
459 #endif
460       this->filecontents2 = this->filecontents2_alloc = (char *) MALLOC_IN((offset_end_2 - offset_start_2 + 2) * sizeof(char));
461       fread(this->filecontents2,offset_end_2 - offset_start_2,sizeof(char),this->input);
462       this->filecontents2[offset_end_2 - offset_start_2] = '\0';
463     }
464 #endif
465   }
466 
467   /* Read from filecontents */
468   while (nread < this->nspaces &&
469 	 (queryseq1 = Shortread_read_filecontents(&this->nextchar,&queryseq2,
470 						  &this->filecontents1,&this->filecontents2,&this->input,&this->input2,
471 #ifdef USE_MPI_FILE_INPUT
472 						  workers_comm,
473 #endif
474 						  this->interleavedp,
475 						  this->read_files_command,&this->files,&this->nfiles,
476 						  skipp = (this->inputid % part_interval != part_modulus))) != NULL) {
477     if (skipp) {
478 #if 0
479       /* Shortread procedures won't allocate in this situation */
480       Shortread_free(&queryseq1);
481       if (queryseq2 != NULL) {
482 	Shortread_free(&queryseq2);
483       }
484 #endif
485 
486     } else if (filter_if_both_p == true &&
487 	       Shortread_filterp(queryseq1) == true && (queryseq2 == NULL || Shortread_filterp(queryseq2) == true)) {
488       Shortread_free(&queryseq1);
489       if (queryseq2 != NULL) {
490 	Shortread_free(&queryseq2);
491       }
492 
493     } else if (filter_if_both_p == false &&
494 	       (Shortread_filterp(queryseq1) == true || (queryseq2 != NULL && Shortread_filterp(queryseq2) == true))) {
495       Shortread_free(&queryseq1);
496       if (queryseq2 != NULL) {
497 	Shortread_free(&queryseq2);
498       }
499 
500     } else {
501       this->buffer[nread++] = Request_new(this->requestid++,queryseq1,queryseq2);
502     }
503     this->inputid++;
504   }
505 
506   this->nleft = nread;
507   this->ptr = 0;
508 
509   /* Need to set this to the FILE * end (EOF at end of file), and not the filecontents end (always '\0') */
510   this->nextchar = nextchar_end;
511 
512 #ifdef USE_MPI
513   debug(printf("Worker %d: ",this->myid));
514 #endif
515   debug(printf("this->nextchar (nextchar_end) is %c (%d)\n",this->nextchar,this->nextchar));
516 
517   return nread;
518 }
519 
520 
521 
522 /* Used by ranks 1..n to communicate with Master_mpi_interface thread of rank 0 */
523 /* Returns number of requests read */
524 static unsigned int
fill_buffer_slave(T this)525 fill_buffer_slave (T this) {
526   unsigned int nread = 0;
527   Shortread_T queryseq1, queryseq2;
528   bool skipp, donep;
529 
530   int strlength1, strlength2;
531   MPI_Status status;
532   int offset_start_1, offset_end_1, offset_start_2, offset_end_2;
533   int nextchar_end;
534 #if 0
535   int nchars1, nchars2;		/* Doesn't need to be saved as a field in Inbuffer_T. */
536 #endif
537 
538   /* Need to receive nextchar_end because of the difference between
539      filecontents end ('\0') and FILE * end (EOF) */
540 
541   debug(fprintf(stdout,"Worker %d: sending notification to master process.  ",this->myid));
542   MPI_SEND(&this->nfiles,1,MPI_INT,/*dest*/0,/*tag*/MPI_TAG_WANT_INPUT,MPI_COMM_WORLD);
543   MPI_RECV(&this->nextchar,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
544   MPI_RECV(&nextchar_end,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
545   MPI_RECV(&donep,1,MPI_UNSIGNED_CHAR,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
546 
547 #if defined(HAVE_ZLIB) && defined(HAVE_BZLIB)
548   if (this->gzipped == NULL && this->bzipped == NULL) {
549     MPI_RECV(&offset_start_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
550     MPI_RECV(&offset_start_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
551     MPI_RECV(&offset_end_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
552     MPI_RECV(&offset_end_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
553     debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
554 		  offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
555 
556     FREE_IN(this->filecontents1_alloc);
557     FREE_IN(this->filecontents2_alloc);
558     this->filecontents1 = (char *) NULL;
559     this->filecontents2 = (char *) NULL;
560 
561   } else {
562     this->filecontents1 = this->filecontents1_alloc =
563       Filestring_recv(&strlength1,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD);
564     this->filecontents2 = this->filecontents2_alloc =
565       Filestring_recv(&strlength2,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD);
566     debug(fprintf(stdout,"Received filestrings of length %d and %d\n",strlength1,strlength2));
567   }
568 
569 #elif defined(HAVE_ZLIB)
570   if (this->gzipped == NULL) {
571     MPI_RECV(&offset_start_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
572     MPI_RECV(&offset_start_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
573     MPI_RECV(&offset_end_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
574     MPI_RECV(&offset_end_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
575     debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
576 		  offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
577     debug(fprintf(stdout,"Received offsets %d..%d and %d..%d\n",offset_start_1,offset_end_1,offset_start_2,offset_end_2));
578 
579     FREE_IN(this->filecontents1_alloc);
580     FREE_IN(this->filecontents2_alloc);
581     this->filecontents1 = (char *) NULL;
582     this->filecontents2 = (char *) NULL;
583 
584   } else {
585     this->filecontents1 = this->filecontents1_alloc =
586       Filestring_recv(&strlength1,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD);
587     this->filecontents2 = this->filecontents2_alloc =
588       Filestring_recv(&strlength2,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD);
589     debug(fprintf(stdout,"Received filestrings of length %d and %d\n",strlength1,strlength2));
590   }
591 
592 #elif defined(HAVE_BZLIB)
593   if (this->bzipped == NULL) {
594     MPI_RECV(&offset_start_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
595     MPI_RECV(&offset_start_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
596     MPI_RECV(&offset_end_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
597     MPI_RECV(&offset_end_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
598     debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
599 		  offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
600     debug(fprintf(stdout,"Received offsets %d..%d and %d..%d\n",offset_start_1,offset_end_1,offset_start_2,offset_end_2));
601 
602     FREE_IN(this->filecontents1_alloc);
603     FREE_IN(this->filecontents2_alloc);
604     this->filecontents1 = (char *) NULL;
605     this->filecontents2 = (char *) NULL;
606 
607   } else {
608     this->filecontents1 = this->filecontents1_alloc =
609       Filestring_recv(&strlength1,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD);
610     this->filecontents2 = this->filecontents2_alloc =
611       Filestring_recv(&strlength2,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD);
612     debug(fprintf(stdout,"Received filestrings of length %d and %d\n",strlength1,strlength2));
613   }
614 
615 #else
616   MPI_RECV(&offset_start_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
617   MPI_RECV(&offset_start_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
618   MPI_RECV(&offset_end_1,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
619   MPI_RECV(&offset_end_2,1,MPI_INT,/*source*/0,/*tag*/MPI_TAG_GIVE_INPUT,MPI_COMM_WORLD,&status);
620   debug(fprintf(stdout,"Received offsets %d..%d and %d..%d, nextchars %c..%c, donep %d\n",
621 		offset_start_1,offset_end_1,offset_start_2,offset_end_2,this->nextchar,nextchar_end,donep));
622 
623   FREE_IN(this->filecontents1_alloc);
624   FREE_IN(this->filecontents2_alloc);
625   this->filecontents1 = (char *) NULL;
626   this->filecontents2 = (char *) NULL;
627 #endif
628 
629 
630   if (this->filecontents1 == NULL) {
631 #if defined(USE_MPI_FILE_INPUT)
632     MPI_File_seek(this->input,offset_start_1,MPI_SEEK_SET);
633     this->filecontents1 = this->filecontents1_alloc = (char *) MALLOC_IN((offset_end_1 - offset_start_1 + 1) * sizeof(char));
634     MPI_File_read(this->input,this->filecontents1,offset_end_1 - offset_start_1,MPI_CHAR,&status);
635     this->filecontents1[offset_end_1 - offset_start_1] = '\0';
636 
637     if (this->input2 != NULL) {
638       MPI_File_seek(this->input2,offset_start_2,MPI_SEEK_SET);
639       this->filecontents2 = this->filecontents2_alloc = (char *) MALLOC_IN((offset_end_2 - offset_start_2 + 1) * sizeof(char));
640       MPI_File_read(this->input2,this->filecontents2,offset_end_2 - offset_start_2,MPI_CHAR,&status);
641       this->filecontents2[offset_end_2 - offset_start_2] = '\0';
642     }
643 
644 #else
645 #ifdef HAVE_FSEEKO
646     fseeko(this->input,offset_start_1,SEEK_SET);
647 #else
648     fseek(this->input,offset_start_1,SEEK_SET);
649 #endif
650     this->filecontents1 = this->filecontents1_alloc = (char *) MALLOC_IN((offset_end_1 - offset_start_1 + 1) * sizeof(char));
651     fread(this->filecontents1,offset_end_1 - offset_start_1,sizeof(char),this->input);
652     this->filecontents1[offset_end_1 - offset_start_1] = '\0';
653     if (this->input2 != NULL) {
654 #ifdef HAVE_FSEEKO
655       fseeko(this->input2,offset_start_2,SEEK_SET);
656 #else
657       fseek(this->input2,offset_start_2,SEEK_SET);
658 #endif
659       this->filecontents2 = this->filecontents2_alloc = (char *) MALLOC_IN((offset_end_2 - offset_start_2 + 2) * sizeof(char));
660       fread(this->filecontents2,offset_end_2 - offset_start_2,sizeof(char),this->input);
661       this->filecontents2[offset_end_2 - offset_start_2] = '\0';
662     }
663 #endif
664   }
665 
666   /* Read from filecontents */
667   while (nread < this->nspaces &&
668 	 (queryseq1 = Shortread_read_filecontents(&this->nextchar,&queryseq2,
669 						  &this->filecontents1,&this->filecontents2,&this->input,&this->input2,
670 #ifdef USE_MPI_FILE_INPUT
671 						  workers_comm,
672 #endif
673 						  this->read_files_command,&this->files,&this->nfiles,
674 						  skipp = (this->inputid % part_interval != part_modulus))) != NULL) {
675     if (skipp) {
676 #if 0
677       /* Shortread procedures won't allocate in this situation */
678       Shortread_free(&queryseq1);
679       if (queryseq2 != NULL) {
680 	Shortread_free(&queryseq2);
681       }
682 #endif
683 
684     } else if (filter_if_both_p == true &&
685 	       Shortread_filterp(queryseq1) == true && (queryseq2 == NULL || Shortread_filterp(queryseq2) == true)) {
686       Shortread_free(&queryseq1);
687       if (queryseq2 != NULL) {
688 	Shortread_free(&queryseq2);
689       }
690 
691     } else if (filter_if_both_p == false &&
692 	       (Shortread_filterp(queryseq1) == true || (queryseq2 != NULL && Shortread_filterp(queryseq2) == true))) {
693       Shortread_free(&queryseq1);
694       if (queryseq2 != NULL) {
695 	Shortread_free(&queryseq2);
696       }
697 
698     } else {
699       this->buffer[nread++] = Request_new(this->requestid++,queryseq1,queryseq2);
700     }
701     this->inputid++;
702   }
703 
704   this->nleft = nread;
705   this->ptr = 0;
706 
707   /* Need to set this to the FILE * end (EOF at end of file), and not the filecontents end (always '\0') */
708   this->nextchar = nextchar_end;
709 
710 #ifdef USE_MPI
711   debug(printf("Worker %d: ",this->myid));
712 #endif
713   debug(printf("this->nextchar (nextchar_end) is %c (%d)\n",this->nextchar,this->nextchar));
714 
715   return nread;
716 }
717 
718 #elif defined(GSNAP)
719 
720 /* Returns number of requests read */
721 static unsigned int
fill_buffer(T this)722 fill_buffer (T this) {
723   unsigned int nread = 0;
724   Shortread_T queryseq1, queryseq2;
725   bool skipp;
726   int nchars1 = 0, nchars2 = 0;		/* Returned only because MPI master needs it.  Doesn't need to be saved as a field in Inbuffer_T. */
727 
728   /* fprintf(stderr,"Entered fill_buffer\n"); */
729   while (nread < this->nspaces &&
730 	 (queryseq1 = Shortread_read(&this->nextchar,&nchars1,&nchars2,&queryseq2,
731 				     &this->input,&this->input2,
732 #ifdef HAVE_ZLIB
733 				     &this->gzipped,&this->gzipped2,
734 #endif
735 #ifdef HAVE_BZLIB
736 				     &this->bzipped,&this->bzipped2,
737 #endif
738 				     this->interleavedp,
739 				     this->read_files_command,&this->files,&this->nfiles,
740 				     skipp = (this->inputid % part_interval != part_modulus))) != NULL) {
741     if (skipp) {
742 #if 0
743       /* Shortread procedures won't allocate in this situation */
744       Shortread_free(&queryseq1);
745       if (queryseq2 != NULL) {
746 	Shortread_free(&queryseq2);
747       }
748 #endif
749 
750     } else if (filter_if_both_p == true &&
751 	       Shortread_filterp(queryseq1) == true && (queryseq2 == NULL || Shortread_filterp(queryseq2) == true)) {
752       Shortread_free(&queryseq1);
753       if (queryseq2 != NULL) {
754 	Shortread_free(&queryseq2);
755       }
756 
757     } else if (filter_if_both_p == false &&
758 	       (Shortread_filterp(queryseq1) == true || (queryseq2 != NULL && Shortread_filterp(queryseq2) == true))) {
759       Shortread_free(&queryseq1);
760       if (queryseq2 != NULL) {
761 	Shortread_free(&queryseq2);
762       }
763 
764     } else {
765       this->buffer[nread++] = Request_new(this->requestid++,queryseq1,queryseq2);
766     }
767     this->inputid++;
768   }
769   /* fprintf(stderr,"Read %d reads\n",nread); */
770 
771   this->nleft = nread;
772   this->ptr = 0;
773 
774   return nread;
775 }
776 
777 #else
778 
779 /* GMAP version */
780 /* Returns number of requests read */
781 static unsigned int
fill_buffer(T this)782 fill_buffer (T this) {
783   unsigned int nread = 0;
784 #if 0
785   unsigned int nchars = 0U;
786 #endif
787   Sequence_T queryseq;
788 
789   while (nread < this->nspaces &&
790 	 (queryseq = Sequence_read_multifile(&this->nextchar,&this->input,
791 					     this->read_files_command,&this->files,&this->nfiles)) != NULL) {
792     if (this->inputid % part_interval != part_modulus) {
793       Sequence_free(&queryseq);
794     } else {
795       debug(printf("inbuffer creating request %d\n",this->requestid));
796       this->buffer[nread++] = Request_new(this->requestid++,queryseq);
797 #if 0
798       nchars += Sequence_fulllength(queryseq);
799 #endif
800     }
801     this->inputid++;
802   }
803 
804   this->nleft = nread;
805   this->ptr = 0;
806 
807   return nread;
808 }
809 
810 #endif
811 
812 
813 #ifndef USE_MPI
814 /* No need to lock, since only main thread calls */
815 /* Returns nread to give to Outbuffer_new */
816 unsigned int
Inbuffer_fill_init(T this)817 Inbuffer_fill_init (T this) {
818   unsigned int nread;
819 
820   debug(printf("inbuffer filling initially\n"));
821   nread = fill_buffer(this);
822   debug(printf("inbuffer read %d sequences\n",nread));
823 
824   return nread;
825 }
826 #endif
827 
828 
829 Request_T
830 #ifdef GSNAP
Inbuffer_get_request(T this)831 Inbuffer_get_request (T this)
832 #else
833 Inbuffer_get_request (Sequence_T *pairalign_segment, T this)
834 #endif
835 {
836   Request_T request;
837   unsigned int nread;
838 
839   debug(printf("Calling Inbuffer_get_request\n"));
840 
841 #if defined(HAVE_PTHREAD)
842   pthread_mutex_lock(&this->lock);
843 #endif
844 
845   if (this->nleft > 0) {
846     request = this->buffer[this->ptr++];
847     this->nleft -= 1;
848 
849 #if 0
850   } else if (this->nextchar == EOF) {
851     /* Causes --force-single-end to fail when reads in a file are a multiple of nspaces */
852     /* Want to call fill_buffer to find out if the input is exhausted */
853     Outbuffer_add_nread(this->outbuffer,/*nread*/0);
854     request = NULL;
855 #endif
856 
857   } else {
858 #ifdef USE_MPI
859     debug(printf("Worker %d: ",this->myid));
860 #endif
861     debug(printf("inbuffer filling with nextchar %c (%d)\n",this->nextchar,this->nextchar));
862 
863 #ifndef GSNAP
864     if (user_pairalign_p == true) {
865       /* assert(this->nspaces == 1) */
866       if (this->pairalign_segment != NULL) {
867 	Sequence_free(&this->pairalign_segment);
868       }
869       this->pairalign_segment = Sequence_read_unlimited(&this->nextchar,stdin);
870       debug(printf("  but first reading usersegment, got nextchar %c\n",this->nextchar));
871     }
872 #endif
873 
874 #ifdef USE_MPI
875     if (this->myid == 0) {
876       nread = fill_buffer_master(this);
877     } else {
878       nread = fill_buffer_slave(this);
879     }
880 #else
881     nread = fill_buffer(this);
882 #endif
883 
884     Outbuffer_add_nread(this->outbuffer,nread);
885 #ifdef USE_MPI
886     debug(printf("Worker %d: ",this->myid));
887 #endif
888     debug(printf("inbuffer read %d sequences\n",nread));
889 
890     if (nread == 0) {
891       /* Still empty */
892       request = NULL;
893     } else {
894       request = this->buffer[this->ptr++];
895       this->nleft -= 1;
896     }
897   }
898 
899 #ifndef GSNAP
900   *pairalign_segment = this->pairalign_segment;
901 #endif
902 
903 #if defined(HAVE_PTHREAD)
904   pthread_mutex_unlock(&this->lock);
905 #endif
906 
907   return request;
908 }
909 
910 
911 #ifndef GSNAP
912 /* Same as Inbuffer_get_request, but leaves sequence in buffer.  Used by GMAP for selfalign feature. */
913 Request_T
Inbuffer_first_request(T this)914 Inbuffer_first_request (T this) {
915   Request_T request;
916   unsigned int nread;
917 
918   debug(printf("Calling Inbuffer_first_request\n"));
919 
920 #if defined(HAVE_PTHREAD)
921   pthread_mutex_lock(&this->lock);
922 #endif
923 
924   if (this->nleft > 0) {
925     request = this->buffer[this->ptr/*++*/];
926     /* this->nleft -= 1; */
927 
928   } else {
929     debug(printf("inbuffer filling\n"));
930     nread = fill_buffer(this);
931     Outbuffer_add_nread(this->outbuffer,nread);
932     debug(printf("inbuffer read %d sequences\n",nread));
933 
934     if (nread == 0) {
935       /* Still empty */
936       request = NULL;
937     } else {
938       request = this->buffer[this->ptr/*++*/];
939       /* this->nleft -= 1; */
940     }
941   }
942 
943 #if defined(HAVE_PTHREAD)
944   pthread_mutex_unlock(&this->lock);
945 #endif
946 
947   return request;
948 }
949 #endif
950 
951 
952