1 static char rcsid[] = "$Id: outbuffer.c 218147 2019-01-16 21:28:41Z twu $";
2 #ifdef HAVE_CONFIG_H
3 #include <config.h>
4 #endif
5 
6 #include "outbuffer.h"
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 
11 #ifdef HAVE_PTHREAD
12 #ifdef HAVE_SYS_TYPES_H
13 #include <sys/types.h>		/* Needed to define pthread_t on Solaris */
14 #endif
15 #include <pthread.h>
16 #endif
17 
18 #include "assert.h"
19 #include "bool.h"
20 #include "mem.h"
21 #include "samheader.h"
22 
23 
24 /* MPI processing */
25 #ifdef DEBUGM
26 #define debugm(x) x
27 #else
28 #define debugm(x)
29 #endif
30 
31 #ifdef DEBUG
32 #define debug(x) x
33 #else
34 #define debug(x)
35 #endif
36 
37 #ifdef DEBUG1
38 #define debug1(x) x
39 #else
40 #define debug1(x)
41 #endif
42 
43 
44 /* sam-to-bam conversions always need the headers */
45 #define SAM_HEADERS_ON_EMPTY_FILES 1
46 
47 static int argc;
48 static char **argv;
49 static int optind_save;
50 
51 static Univ_IIT_T chromosome_iit;
52 static bool any_circular_p;
53 static int nworkers;
54 static bool orderedp;
55 static bool quiet_if_excessive_p;
56 
57 #ifdef GSNAP
58 static Outputtype_T output_type;
59 #else
60 static Printtype_T printtype;
61 static Sequence_T usersegment;
62 #endif
63 
64 static bool sam_headers_p;
65 static char *sam_read_group_id;
66 static char *sam_read_group_name;
67 static char *sam_read_group_library;
68 static char *sam_read_group_platform;
69 
70 static bool appendp;
71 static char *output_file;
72 static char *split_output_root;
73 static char *failedinput_root;
74 
75 #ifdef USE_MPI
76 static MPI_File *outputs;
77 static MPI_File output_failedinput;
78 #ifdef GSNAP
79 static MPI_File output_failedinput_1;
80 static MPI_File output_failedinput_2;
81 #endif
82 
83 
84 #else
85 static char *write_mode;
86 static FILE **outputs = NULL;
87 static FILE *output_failedinput;
88 #ifdef GSNAP
89 static FILE *output_failedinput_1;
90 static FILE *output_failedinput_2;
91 #endif
92 
93 #endif
94 
95 
96 #ifndef GSNAP
97 /* Taken from Univ_IIT_dump_sam */
98 static void
dump_sam_usersegment(FILE * fp,Sequence_T usersegment,char * sam_read_group_id,char * sam_read_group_name,char * sam_read_group_library,char * sam_read_group_platform)99 dump_sam_usersegment (FILE *fp, Sequence_T usersegment,
100 		      char *sam_read_group_id, char *sam_read_group_name,
101 		      char *sam_read_group_library, char *sam_read_group_platform) {
102 
103   fprintf(fp,"@SQ\tSN:%s",Sequence_accession(usersegment));
104   fprintf(fp,"\tLN:%u",Sequence_fulllength(usersegment));
105   fprintf(fp,"\n");
106 
107   if (sam_read_group_id != NULL) {
108     fprintf(fp,"@RG\tID:%s",sam_read_group_id);
109     if (sam_read_group_platform != NULL) {
110       fprintf(fp,"\tPL:%s",sam_read_group_platform);
111     }
112     if (sam_read_group_library != NULL) {
113       fprintf(fp,"\tLB:%s",sam_read_group_library);
114     }
115     fprintf(fp,"\tSM:%s",sam_read_group_name);
116     fprintf(fp,"\n");
117   }
118 
119   return;
120 }
121 #endif
122 
123 
124 #ifndef GSNAP
125 static void
print_gff_header(FILE * fp,int argc,char ** argv,int optind)126 print_gff_header (FILE *fp, int argc, char **argv, int optind) {
127   char **argstart;
128   int c;
129 
130   fprintf(fp,"##gff-version   3\n");
131   fprintf(fp,"# Generated by GMAP version %s using call: ",PACKAGE_VERSION);
132   argstart = &(argv[-optind]);
133   for (c = 0; c < argc + optind; c++) {
134     fprintf(fp," %s",argstart[c]);
135   }
136   fprintf(fp,"\n");
137   return;
138 }
139 #endif
140 
141 
142 static void
print_file_headers(MPI_File output)143 print_file_headers (
144 #ifdef USE_MPI
145 		    MPI_File output
146 #else
147 		    FILE *output
148 #endif
149 		    ) {
150 
151   if (output == NULL) {
152     /* Possible since we are no longer creating a file for OUTPUT_NONE */
153     return;
154   }
155 
156 #ifdef GSNAP
157   if (output_type == SAM_OUTPUT && sam_headers_p == true) {
158     SAM_header_print_HD(output,nworkers,orderedp);
159     SAM_header_print_PG(output,argc,argv,optind_save);
160     Univ_IIT_dump_sam(output,chromosome_iit,sam_read_group_id,sam_read_group_name,
161 		      sam_read_group_library,sam_read_group_platform);
162   }
163 
164 #else
165   if (printtype == GFF3_GENE || printtype == GFF3_MATCH_CDNA || printtype == GFF3_MATCH_EST) {
166     print_gff_header(output,argc,argv,optind_save);
167 
168 #ifndef PMAP
169   } else if (printtype == SAM && sam_headers_p == true) {
170     if (usersegment != NULL) {
171       dump_sam_usersegment(output,usersegment,sam_read_group_id,sam_read_group_name,
172 			   sam_read_group_library,sam_read_group_platform);
173     } else {
174       SAM_header_print_HD(output,nworkers,orderedp);
175       SAM_header_print_PG(output,argc,argv,optind_save);
176       Univ_IIT_dump_sam(output,chromosome_iit,sam_read_group_id,sam_read_group_name,
177 			sam_read_group_library,sam_read_group_platform);
178     }
179 #endif
180 
181   }
182 #endif
183 
184   return;
185 }
186 
187 
188 static void
failedinput_open(char * failedinput_root)189 failedinput_open (char *failedinput_root) {
190   char *filename;
191 
192 #ifdef GSNAP
193   filename = (char *) MALLOC((strlen(failedinput_root)+strlen(".1")+1) * sizeof(char));
194   sprintf(filename,"%s.1",failedinput_root);
195 
196 #ifdef USE_MPI
197   if (appendp == true) {
198     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_APPEND,
199                   MPI_INFO_NULL,&output_failedinput_1);
200   } else {
201     /* Need to remove existing file, if any */
202     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_DELETE_ON_CLOSE,
203 		  MPI_INFO_NULL,&output_failedinput_1);
204     MPI_File_close(&output_failedinput_1);
205     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY,
206 		  MPI_INFO_NULL,&output_failedinput_1);
207   }
208 #else
209   if ((output_failedinput_1 = fopen(filename,write_mode)) == NULL) {
210     fprintf(stderr,"Cannot open file %s for writing\n",filename);
211     exit(9);
212   }
213 #endif
214 
215   /* Re-use filename, since it is the same length */
216   sprintf(filename,"%s.2",failedinput_root);
217 #ifdef USE_MPI
218   if (appendp == true) {
219     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_APPEND,
220 		  MPI_INFO_NULL,&output_failedinput_2);
221   } else {
222     /* Need to remove existing file, if any */
223     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_DELETE_ON_CLOSE,
224 		  MPI_INFO_NULL,&output_failedinput_2);
225     MPI_File_close(&output_failedinput_2);
226     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY,
227 		  MPI_INFO_NULL,&output_failedinput_2);
228   }
229 #else
230   if ((output_failedinput_2 = fopen(filename,write_mode)) == NULL) {
231     fprintf(stderr,"Cannot open file %s for writing\n",filename);
232     exit(9);
233   }
234 #endif
235 
236   /* Re-use filename, since it is shorter */
237   sprintf(filename,"%s",failedinput_root);
238 #ifdef USE_MPI
239   if (appendp == true) {
240     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_APPEND,
241                   MPI_INFO_NULL,&output_failedinput);
242   } else {
243     /* Need to remove existing file, if any */
244     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_DELETE_ON_CLOSE,
245 		  MPI_INFO_NULL,&output_failedinput);
246     MPI_File_close(&output_failedinput);
247     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY,
248 		  MPI_INFO_NULL,&output_failedinput);
249   }
250 #else
251   if ((output_failedinput = fopen(filename,write_mode)) == NULL) {
252     fprintf(stderr,"Cannot open file %s for writing\n",filename);
253     exit(9);
254   }
255 #endif
256   FREE(filename);
257 
258 
259 #else  /* GMAP */
260   filename = (char *) MALLOC((strlen(failedinput_root)+1) * sizeof(char));
261   sprintf(filename,"%s",failedinput_root);
262 #ifdef USE_MPI
263   if (appendp == true) {
264     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_APPEND,
265 		  MPI_INFO_NULL,&output_failedinput);
266   } else {
267     /* Need to remove existing file, if any */
268     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_DELETE_ON_CLOSE,
269 		  MPI_INFO_NULL,&output_failedinput);
270     MPI_File_close(&output_failedinput);
271     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY,
272 		  MPI_INFO_NULL,&output_failedinput);
273   }
274 #else
275   if ((output_failedinput = fopen(filename,write_mode)) == NULL) {
276     fprintf(stderr,"Cannot open file %s for writing\n",filename);
277     exit(9);
278   }
279 #endif
280   FREE(filename);
281 
282 #endif	/* GSNAP */
283 
284   return;
285 }
286 
287 
288 void
Outbuffer_setup(int argc_in,char ** argv_in,int optind_in,Univ_IIT_T chromosome_iit_in,bool any_circular_p_in,int nworkers_in,bool orderedp_in,bool quiet_if_excessive_p_in,Outputtype_T output_type_in,bool sam_headers_p_in,char * sam_read_group_id_in,char * sam_read_group_name_in,char * sam_read_group_library_in,char * sam_read_group_platform_in,bool appendp_in,char * output_file_in,char * split_output_root_in,char * failedinput_root_in)289 Outbuffer_setup (int argc_in, char **argv_in, int optind_in,
290 		 Univ_IIT_T chromosome_iit_in, bool any_circular_p_in,
291 		 int nworkers_in, bool orderedp_in, bool quiet_if_excessive_p_in,
292 #ifdef GSNAP
293 		 Outputtype_T output_type_in,
294 #else
295 		 Printtype_T printtype_in, Sequence_T usersegment_in,
296 #endif
297 		 bool sam_headers_p_in, char *sam_read_group_id_in, char *sam_read_group_name_in,
298 		 char *sam_read_group_library_in, char *sam_read_group_platform_in,
299 		 bool appendp_in, char *output_file_in, char *split_output_root_in, char *failedinput_root_in) {
300 #ifdef USE_MPI
301   SAM_split_output_type split_output;
302 #endif
303 
304 
305   argc = argc_in;
306   argv = argv_in;
307   optind_save = optind_in;
308 
309   chromosome_iit = chromosome_iit_in;
310   any_circular_p = any_circular_p_in;
311 
312   nworkers = nworkers_in;
313   orderedp = orderedp_in;
314   quiet_if_excessive_p = quiet_if_excessive_p_in;
315 
316 #ifdef GSNAP
317   output_type = output_type_in;
318 #else
319   printtype = printtype_in;
320   usersegment = usersegment_in;
321 #endif
322 
323   sam_headers_p = sam_headers_p_in;
324   sam_read_group_id = sam_read_group_id_in;
325   sam_read_group_name = sam_read_group_name_in;
326   sam_read_group_library = sam_read_group_library_in;
327   sam_read_group_platform = sam_read_group_platform_in;
328 
329   appendp = appendp_in;
330   split_output_root = split_output_root_in;
331   output_file = output_file_in;
332 
333 
334   /************************************************************************/
335   /* Output files */
336   /************************************************************************/
337 
338 #ifdef USE_MPI
339   /* All processes need to run MPI_File_open, and need to open all files now */
340   outputs = (MPI_File *) CALLOC_KEEP(1+N_SPLIT_OUTPUTS,sizeof(MPI_File));
341   if (split_output_root != NULL) {
342     for (split_output = 1; split_output <= N_SPLIT_OUTPUTS; split_output++) {
343       outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
344 #ifdef SAM_HEADERS_ON_EMPTY_FILES
345       print_file_headers(outputs[split_output]);
346 #endif
347     }
348 
349   } else if (output_file != NULL) {
350     outputs[0] = SAM_header_open_file(OUTPUT_FILE,/*split_output_root*/output_file,appendp);
351 #ifdef SAM_HEADERS_ON_EMPTY_FILES
352     print_file_headers(outputs[0]);
353 #endif
354     for (split_output = 1; split_output <= N_SPLIT_OUTPUTS; split_output++) {
355       outputs[split_output] = outputs[0];
356     }
357 
358   } else {
359     /* Write to stdout */
360     outputs[0] = (MPI_File) NULL;
361 #ifdef SAM_HEADERS_ON_EMPTY_FILES
362     print_file_headers(outputs[0]);
363 #endif
364     for (split_output = 1; split_output <= N_SPLIT_OUTPUTS; split_output++) {
365       outputs[split_output] = (MPI_File) NULL;
366     }
367   }
368 
369 #else
370   /* Only the output thread needs to run fopen, and can open files when needed */
371   if (appendp == true) {
372     write_mode = "a";
373   } else {
374     write_mode = "w";
375   }
376   outputs = (FILE **) CALLOC_KEEP(1+N_SPLIT_OUTPUTS,sizeof(FILE *));
377 #endif
378 
379 
380   /************************************************************************/
381   /* Failed input files */
382   /************************************************************************/
383 
384   failedinput_root = failedinput_root_in;
385   if (failedinput_root == NULL) {
386     output_failedinput = NULL;
387 #ifdef GSNAP
388     output_failedinput_1 = output_failedinput_2 = NULL;
389 #endif
390   } else {
391     failedinput_open(failedinput_root);
392   }
393 
394   return;
395 }
396 
397 
398 void
Outbuffer_cleanup()399 Outbuffer_cleanup () {
400   FREE_KEEP(outputs);		/* Matches CALLOC_KEEP in Outbuffer_setup */
401   return;
402 }
403 
404 
405 typedef struct RRlist_T *RRlist_T;
406 struct RRlist_T {
407   int id;
408   Filestring_T fp;
409   Filestring_T fp_failedinput;
410 #ifdef GSNAP
411   Filestring_T fp_failedinput_1;
412   Filestring_T fp_failedinput_2;
413 #endif
414   RRlist_T next;
415 };
416 
417 
418 #ifdef DEBUG1
419 static void
RRlist_dump(RRlist_T head,RRlist_T tail)420 RRlist_dump (RRlist_T head, RRlist_T tail) {
421   RRlist_T this;
422 
423   printf("head %p\n",head);
424   for (this = head; this != NULL; this = this->next) {
425     printf("%p: next %p\n",this,this->next);
426   }
427   printf("tail %p\n",tail);
428   printf("\n");
429   return;
430 }
431 #endif
432 
433 
434 /* Returns new tail */
435 static RRlist_T
RRlist_push(RRlist_T * head,RRlist_T tail,Filestring_T fp,Filestring_T fp_failedinput,Filestring_T fp_failedinput_1,Filestring_T fp_failedinput_2)436 RRlist_push (RRlist_T *head, RRlist_T tail, Filestring_T fp,
437 	     Filestring_T fp_failedinput
438 #ifdef GSNAP
439 	     , Filestring_T fp_failedinput_1, Filestring_T fp_failedinput_2
440 #endif
441 	     ) {
442   RRlist_T new;
443 
444   new = (RRlist_T) MALLOC_OUT(sizeof(*new)); /* Called by worker thread */
445   new->fp = fp;
446   new->fp_failedinput = fp_failedinput;
447 #ifdef GSNAP
448   new->fp_failedinput_1 = fp_failedinput_1;
449   new->fp_failedinput_2 = fp_failedinput_2;
450 #endif
451   new->next = (RRlist_T) NULL;
452 
453   if (*head == NULL) {		/* Equivalent to tail == NULL, but using *head avoids having to set tail in RRlist_pop */
454     *head = new;
455   } else {
456     tail->next = new;
457   }
458 
459   return new;
460 }
461 
462 
463 /* Returns new head */
464 static RRlist_T
RRlist_pop(RRlist_T head,Filestring_T * fp,Filestring_T * fp_failedinput,Filestring_T * fp_failedinput_1,Filestring_T * fp_failedinput_2)465 RRlist_pop (RRlist_T head, Filestring_T *fp,
466 	    Filestring_T *fp_failedinput
467 #ifdef GSNAP
468 	    , Filestring_T *fp_failedinput_1, Filestring_T *fp_failedinput_2
469 #endif
470 	    ) {
471   RRlist_T newhead;
472 
473   *fp = head->fp;
474   *fp_failedinput = head->fp_failedinput;
475 #ifdef GSNAP
476   *fp_failedinput_1 = head->fp_failedinput_1;
477   *fp_failedinput_2 = head->fp_failedinput_2;
478 #endif
479 
480   newhead = head->next;
481 
482   FREE_OUT(head);		/* Called by outbuffer thread */
483   return newhead;
484 }
485 
486 
487 static RRlist_T
RRlist_insert(RRlist_T list,int id,Filestring_T fp,Filestring_T fp_failedinput,Filestring_T fp_failedinput_1,Filestring_T fp_failedinput_2)488 RRlist_insert (RRlist_T list, int id, Filestring_T fp,
489 	       Filestring_T fp_failedinput
490 #ifdef GSNAP
491 	       , Filestring_T fp_failedinput_1, Filestring_T fp_failedinput_2
492 #endif
493 	       ) {
494   RRlist_T *p;
495   RRlist_T new;
496 
497   p = &list;
498   while (*p != NULL && id > (*p)->id) {
499     p = &(*p)->next;
500   }
501 
502   new = (RRlist_T) MALLOC_OUT(sizeof(*new));
503   new->id = id;
504   new->fp = fp;
505   new->fp_failedinput = fp_failedinput;
506 #ifdef GSNAP
507   new->fp_failedinput_1 = fp_failedinput_1;
508   new->fp_failedinput_2 = fp_failedinput_2;
509 #endif
510 
511   new->next = *p;
512   *p = new;
513   return list;
514 }
515 
516 /* Returns new head */
517 static RRlist_T
RRlist_pop_id(RRlist_T head,int * id,Filestring_T * fp,Filestring_T * fp_failedinput,Filestring_T * fp_failedinput_1,Filestring_T * fp_failedinput_2)518 RRlist_pop_id (RRlist_T head, int *id, Filestring_T *fp,
519 	       Filestring_T *fp_failedinput
520 #ifdef GSNAP
521 	       , Filestring_T *fp_failedinput_1, Filestring_T *fp_failedinput_2
522 #endif
523 	       ) {
524   RRlist_T newhead;
525 
526   *id = head->id;
527   *fp = head->fp;
528   *fp_failedinput = head->fp_failedinput;
529 #ifdef GSNAP
530   *fp_failedinput_1 = head->fp_failedinput_1;
531   *fp_failedinput_2 = head->fp_failedinput_2;
532 #endif
533 
534   newhead = head->next;
535 
536   FREE_OUT(head);		/* Called by outbuffer thread */
537   return newhead;
538 }
539 
540 
541 #define T Outbuffer_T
542 struct T {
543 
544 #ifdef HAVE_PTHREAD
545   pthread_mutex_t lock;
546 #endif
547 
548   unsigned int output_buffer_size;
549   unsigned int nread;
550   unsigned int ntotal;
551   unsigned int nbeyond;		/* MPI request that is beyond the given inputs */
552   unsigned int nprocessed;
553 
554   RRlist_T head;
555   RRlist_T tail;
556 
557 #ifdef HAVE_PTHREAD
558   pthread_cond_t filestring_avail_p;
559 #endif
560 };
561 
562 
563 /************************************************************************
564  *   File routines
565  ************************************************************************/
566 
567 
568 T
Outbuffer_new(unsigned int output_buffer_size,unsigned int nread)569 Outbuffer_new (unsigned int output_buffer_size, unsigned int nread) {
570   T new = (T) MALLOC_KEEP(sizeof(*new));
571 
572 #ifdef HAVE_PTHREAD
573   pthread_mutex_init(&new->lock,NULL);
574 #endif
575 
576   new->output_buffer_size = output_buffer_size;
577   new->nread = nread;
578 
579   /* Set to infinity until all reads are input.  Used for Pthreads version */
580   new->ntotal = (unsigned int) -1;
581 
582   new->nbeyond = 0;
583   new->nprocessed = 0;
584 
585   new->head = (RRlist_T) NULL;
586   new->tail = (RRlist_T) NULL;
587 
588 #ifdef HAVE_PTHREAD
589   pthread_cond_init(&new->filestring_avail_p,NULL);
590 #endif
591 
592   return new;
593 }
594 
595 
596 
597 #ifndef USE_MPI
598 /* Open empty files, and add SAM headers if SAM_HEADERS_ON_EMPTY_FILES is set */
599 static void
touch_all_single_outputs(FILE ** outputs,char * split_output_root,bool appendp)600 touch_all_single_outputs (FILE **outputs, char *split_output_root, bool appendp) {
601   SAM_split_output_type split_output;
602 
603   split_output = 1;
604   while (split_output <= N_SPLIT_OUTPUTS_SINGLE_STD) {
605     if (outputs[split_output] == NULL) {
606       outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
607 #ifdef SAM_HEADERS_ON_EMPTY_FILES
608       print_file_headers(outputs[split_output]);
609 #endif
610     }
611     split_output++;
612   }
613 
614   if (any_circular_p == false) {
615     split_output = N_SPLIT_OUTPUTS_SINGLE_TOCIRC + 1;
616   } else {
617     while (split_output <= N_SPLIT_OUTPUTS_SINGLE_TOCIRC) {
618       if (outputs[split_output] == NULL) {
619 	outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
620 #ifdef SAM_HEADERS_ON_EMPTY_FILES
621         print_file_headers(outputs[split_output]);
622 #endif
623       }
624       split_output++;
625     }
626   }
627 
628   if (quiet_if_excessive_p == true) {
629     while (split_output <= N_SPLIT_OUTPUTS_SINGLE) {
630       if (outputs[split_output] == NULL) {
631 	outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
632 #ifdef SAM_HEADERS_ON_EMPTY_FILES
633         print_file_headers(outputs[split_output]);
634 #endif
635       }
636       split_output++;
637     }
638   }
639 
640   return;
641 }
642 #endif
643 
644 
645 #ifndef USE_MPI
646 /* Open empty files, and add SAM headers if SAM_HEADERS_ON_EMPTY_FILES is set */
647 static void
touch_all_paired_outputs(FILE ** outputs,char * split_output_root,bool appendp)648 touch_all_paired_outputs (FILE **outputs, char *split_output_root, bool appendp) {
649   SAM_split_output_type split_output;
650 
651   split_output = N_SPLIT_OUTPUTS_SINGLE + 1;
652   while (split_output <= N_SPLIT_OUTPUTS_STD) {
653     if (outputs[split_output] == NULL) {
654       outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
655 #ifdef SAM_HEADERS_ON_EMPTY_FILES
656       print_file_headers(outputs[split_output]);
657 #endif
658     }
659     split_output++;
660   }
661 
662   if (any_circular_p == false) {
663     split_output = N_SPLIT_OUTPUTS_TOCIRC + 1;
664   } else {
665     while (split_output <= N_SPLIT_OUTPUTS_TOCIRC) {
666       if (outputs[split_output] == NULL) {
667 	outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
668 #ifdef SAM_HEADERS_ON_EMPTY_FILES
669         print_file_headers(outputs[split_output]);
670 #endif
671       }
672       split_output++;
673     }
674   }
675 
676   if (quiet_if_excessive_p == true) {
677     while (split_output <= N_SPLIT_OUTPUTS) {
678       if (outputs[split_output] == NULL) {
679 	outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
680 #ifdef SAM_HEADERS_ON_EMPTY_FILES
681         print_file_headers(outputs[split_output]);
682 #endif
683       }
684       split_output++;
685     }
686   }
687 
688   return;
689 }
690 #endif
691 
692 
693 #ifndef USE_MPI
694 static bool
paired_outputs_p(FILE ** outputs)695 paired_outputs_p (FILE **outputs) {
696   SAM_split_output_type split_output;
697 
698   split_output = N_SPLIT_OUTPUTS_SINGLE + 1;
699   while (split_output <= N_SPLIT_OUTPUTS) {
700     if (outputs[split_output] != NULL) {
701       return true;
702     }
703     split_output++;
704   }
705 
706   return false;
707 }
708 #endif
709 
710 
711 #ifndef USE_MPI
712 static void
touch_all_files(FILE ** outputs,char * split_output_root,bool appendp)713 touch_all_files (FILE **outputs, char *split_output_root, bool appendp) {
714   touch_all_single_outputs(outputs,split_output_root,appendp);
715   if (paired_outputs_p(outputs) == true) {
716     touch_all_paired_outputs(outputs,split_output_root,appendp);
717   }
718   return;
719 }
720 #endif
721 
722 
723 
724 void
Outbuffer_close_files()725 Outbuffer_close_files () {
726   SAM_split_output_type split_output;
727 
728   if (failedinput_root != NULL) {
729 #ifdef USE_MPI
730     MPI_File_close(&output_failedinput);
731 #ifdef GSNAP
732     MPI_File_close(&output_failedinput_1);
733     MPI_File_close(&output_failedinput_2);
734 #endif
735 
736 #else
737     fclose(output_failedinput);
738 #ifdef GSNAP
739     fclose(output_failedinput_1);
740     fclose(output_failedinput_2);
741 #endif
742 #endif
743 
744   }
745 
746 #ifdef USE_MPI
747   if (split_output_root != NULL) {
748     for (split_output = 1; split_output <= N_SPLIT_OUTPUTS; split_output++) {
749       MPI_File_close(&(outputs[split_output]));
750     }
751   } else if (output_file != NULL) {
752     MPI_File_close(&(outputs[0]));
753   } else {
754     /* Wrote to stdout */
755   }
756 
757 #else
758   if (split_output_root != NULL) {
759     touch_all_files(outputs,split_output_root,appendp);
760 
761     for (split_output = 1; split_output <= N_SPLIT_OUTPUTS; split_output++) {
762       if (outputs[split_output] != NULL) {
763 	fclose(outputs[split_output]);
764       }
765     }
766   } else if (output_file != NULL) {
767     fclose(outputs[0]);
768   } else {
769     /* Wrote to stdout */
770   }
771 #endif
772 
773   FREE_KEEP(outputs);
774 
775   return;
776 }
777 
778 
779 void
Outbuffer_free(T * old)780 Outbuffer_free (T *old) {
781 
782   if (*old) {
783 #ifdef HAVE_PTHREAD
784     pthread_cond_destroy(&(*old)->filestring_avail_p);
785     pthread_mutex_destroy(&(*old)->lock);
786 #endif
787 
788     FREE_KEEP(*old);
789   }
790 
791   return;
792 }
793 
794 
795 unsigned int
Outbuffer_nread(T this)796 Outbuffer_nread (T this) {
797   return this->nread;
798 }
799 
800 unsigned int
Outbuffer_nbeyond(T this)801 Outbuffer_nbeyond (T this) {
802   return this->nbeyond;
803 }
804 
805 
806 void
Outbuffer_add_nread(T this,unsigned int nread)807 Outbuffer_add_nread (T this, unsigned int nread) {
808 
809 #ifdef HAVE_PTHREAD
810   pthread_mutex_lock(&this->lock);
811 #endif
812 
813   if (nread == 0) {
814     /* Finished reading, so able to determine total reads in input */
815     this->ntotal = this->nread;
816     debug(fprintf(stderr,"__Outbuffer_add_nread added 0 reads, so setting ntotal to be %u\n",this->ntotal));
817 
818 #ifdef HAVE_PTHREAD
819     pthread_cond_signal(&this->filestring_avail_p);
820 #endif
821 
822   } else {
823     this->nread += nread;
824 #ifdef USE_MPI
825     this->ntotal = this->nread;
826 #endif
827     debug(fprintf(stderr,"__Outbuffer_add_nread added %d read, now %d\n",nread,this->nread));
828   }
829 
830 #ifdef HAVE_PTHREAD
831   pthread_mutex_unlock(&this->lock);
832 #endif
833 
834   return;
835 }
836 
837 
838 void
Outbuffer_add_nbeyond(T this)839 Outbuffer_add_nbeyond (T this) {
840 
841 #ifdef HAVE_PTHREAD
842   pthread_mutex_lock(&this->lock);
843 #endif
844 
845   this->nbeyond += 1;
846 
847 #ifdef HAVE_PTHREAD
848   pthread_cond_signal(&this->filestring_avail_p);
849   pthread_mutex_unlock(&this->lock);
850 #endif
851 
852   return;
853 }
854 
855 
856 #ifdef GSNAP
857 void
Outbuffer_put_filestrings(T this,Filestring_T fp,Filestring_T fp_failedinput,Filestring_T fp_failedinput_1,Filestring_T fp_failedinput_2)858 Outbuffer_put_filestrings (T this, Filestring_T fp, Filestring_T fp_failedinput, Filestring_T fp_failedinput_1, Filestring_T fp_failedinput_2) {
859 
860 #ifdef HAVE_PTHREAD
861   pthread_mutex_lock(&this->lock);
862 #endif
863 
864   this->tail = RRlist_push(&this->head,this->tail,fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
865   debug1(RRlist_dump(this->head,this->tail));
866   this->nprocessed += 1;
867 
868 #ifdef HAVE_PTHREAD
869   debug(printf("Signaling that filestring is available\n"));
870   pthread_cond_signal(&this->filestring_avail_p);
871   pthread_mutex_unlock(&this->lock);
872 #endif
873 
874   return;
875 }
876 
877 #else
878 void
Outbuffer_put_filestrings(T this,Filestring_T fp,Filestring_T fp_failedinput)879 Outbuffer_put_filestrings (T this, Filestring_T fp, Filestring_T fp_failedinput) {
880 
881 #ifdef HAVE_PTHREAD
882   pthread_mutex_lock(&this->lock);
883 #endif
884 
885   this->tail = RRlist_push(&this->head,this->tail,fp,fp_failedinput);
886   debug1(RRlist_dump(this->head,this->tail));
887   this->nprocessed += 1;
888 
889 #ifdef HAVE_PTHREAD
890   debug(printf("Signaling that filestring is available\n"));
891   pthread_cond_signal(&this->filestring_avail_p);
892   pthread_mutex_unlock(&this->lock);
893 #endif
894 
895   return;
896 }
897 #endif
898 
899 
900 
901 #ifdef GSNAP
902 void
Outbuffer_print_filestrings(Filestring_T fp,Filestring_T fp_failedinput,Filestring_T fp_failedinput_1,Filestring_T fp_failedinput_2)903 Outbuffer_print_filestrings (Filestring_T fp, Filestring_T fp_failedinput, Filestring_T fp_failedinput_1, Filestring_T fp_failedinput_2) {
904   SAM_split_output_type split_output;
905 #ifdef USE_MPI
906   MPI_File output;
907 #else
908   FILE *output;
909 #endif
910 
911 #ifdef USE_MPI
912   split_output = Filestring_split_output(fp);
913   output = outputs[split_output];
914 
915 #else
916   if (split_output_root != NULL) {
917     split_output = Filestring_split_output(fp);
918     if ((output = outputs[split_output]) == NULL) {
919       output = outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
920       if (split_output == OUTPUT_NONE) {
921 	/* Don't print file headers, since no output will go to
922 	   stdout.  Must be a nomapping when --nofails is specified */
923       } else {
924 	print_file_headers(output);
925       }
926     }
927   } else if ((output = outputs[0]) == NULL) {
928     if (output_file == NULL) {
929       output = outputs[0] = stdout;
930       print_file_headers(stdout);
931     } else {
932       output = outputs[0] = SAM_header_open_file(/*split_output*/OUTPUT_FILE,output_file,appendp);
933       print_file_headers(output);
934     }
935   }
936 #endif
937 
938 #ifdef USE_MPI
939   /* Prevents output from being broken up */
940   Filestring_stringify(fp);
941 #endif
942   Filestring_print(output,fp);
943   Filestring_free(&fp);
944 
945   if (failedinput_root != NULL) {
946     if (fp_failedinput != NULL) {
947 #ifdef USE_MPI
948       Filestring_stringify(fp_failedinput);
949 #endif
950       Filestring_print(output_failedinput,fp_failedinput);
951       Filestring_free(&fp_failedinput);
952     }
953     if (fp_failedinput_1 != NULL) {
954 #ifdef USE_MPI
955       Filestring_stringify(fp_failedinput_1);
956 #endif
957       Filestring_print(output_failedinput_1,fp_failedinput_1);
958       Filestring_free(&fp_failedinput_1);
959     }
960     if (fp_failedinput_2 != NULL) {
961 #ifdef USE_MPI
962       Filestring_stringify(fp_failedinput_2);
963 #endif
964       Filestring_print(output_failedinput_2,fp_failedinput_2);
965       Filestring_free(&fp_failedinput_2);
966     }
967   }
968 
969   return;
970 }
971 
972 #else
973 void
Outbuffer_print_filestrings(Filestring_T fp,Filestring_T fp_failedinput)974 Outbuffer_print_filestrings (Filestring_T fp, Filestring_T fp_failedinput) {
975   SAM_split_output_type split_output;
976 #ifdef USE_MPI
977   MPI_File output;
978 #else
979   FILE *output;
980 #endif
981 
982 #ifdef USE_MPI
983   split_output = Filestring_split_output(fp);
984   output = outputs[split_output];
985 
986 #else
987   if (split_output_root != NULL) {
988     split_output = Filestring_split_output(fp);
989     if ((output = outputs[split_output]) == NULL) {
990       output = outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
991       if (split_output == OUTPUT_NONE && split_output_root != NULL) {
992 	/* Don't print file headers, since no output will go to
993 	   stdout.  Must be a nomapping when --nofails is specified */
994       } else {
995 	print_file_headers(output);
996       }
997     }
998 
999   } else if ((output = outputs[0]) == NULL) {
1000     if (output_file == NULL) {
1001       output = outputs[0] = stdout;
1002       print_file_headers(stdout);
1003     } else {
1004       output = outputs[0] = SAM_header_open_file(/*split_output*/OUTPUT_FILE,output_file,appendp);
1005       print_file_headers(output);
1006     }
1007   }
1008 #endif
1009 
1010 #ifdef USE_MPI
1011   Filestring_stringify(fp);
1012 #endif
1013   Filestring_print(output,fp);
1014   Filestring_free(&fp);
1015 
1016   if (failedinput_root != NULL) {
1017     if (fp_failedinput != NULL) {
1018 #ifdef USE_MPI
1019       Filestring_stringify(fp_failedinput);
1020 #endif
1021       Filestring_print(output_failedinput,fp_failedinput);
1022       Filestring_free(&fp_failedinput);
1023     }
1024   }
1025 
1026   return;
1027 }
1028 
1029 #endif
1030 
1031 
1032 
1033 void *
Outbuffer_thread_anyorder(void * data)1034 Outbuffer_thread_anyorder (void *data) {
1035   T this = (T) data;
1036   unsigned int output_buffer_size = this->output_buffer_size;
1037   unsigned int noutput = 0, ntotal, nbeyond;
1038   Filestring_T fp;
1039   Filestring_T fp_failedinput;
1040 #ifdef GSNAP
1041   Filestring_T fp_failedinput_1, fp_failedinput_2;
1042 #endif
1043 
1044   /* Obtain this->ntotal while locked, to prevent race between output thread and input thread */
1045 #ifdef HAVE_PTHREAD
1046   pthread_mutex_lock(&this->lock);
1047 #endif
1048   ntotal = this->ntotal;
1049   nbeyond = this->nbeyond;
1050 #ifdef HAVE_PTHREAD
1051   pthread_mutex_unlock(&this->lock);
1052 #endif
1053 
1054   while (noutput + nbeyond < ntotal) {	/* Previously check against this->ntotal */
1055 #ifdef HAVE_PTHREAD
1056     pthread_mutex_lock(&this->lock);
1057     while (this->head == NULL && noutput + this->nbeyond < this->ntotal) {
1058       debug(fprintf(stderr,"__outbuffer_thread_anyorder waiting for filestring_avail_p\n"));
1059       pthread_cond_wait(&this->filestring_avail_p,&this->lock);
1060     }
1061     debug(fprintf(stderr,"__outbuffer_thread_anyorder woke up\n"));
1062 #endif
1063 
1064     if (this->head == NULL) {
1065       /* False wake up */
1066 #ifdef HAVE_PTHREAD
1067       pthread_mutex_unlock(&this->lock);
1068 #endif
1069 
1070     } else {
1071 #ifdef GSNAP
1072       this->head = RRlist_pop(this->head,&fp,&fp_failedinput,&fp_failedinput_1,&fp_failedinput_2);
1073 #else
1074       this->head = RRlist_pop(this->head,&fp,&fp_failedinput);
1075 #endif
1076       debug1(RRlist_dump(this->head,this->tail));
1077 
1078 #ifdef HAVE_PTHREAD
1079       /* Let worker threads put filestrings while we print */
1080       pthread_mutex_unlock(&this->lock);
1081 #endif
1082 
1083 #ifdef GSNAP
1084       Outbuffer_print_filestrings(fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1085 #else
1086       Outbuffer_print_filestrings(fp,fp_failedinput);
1087 #endif
1088       noutput += 1;
1089       /* Result_free(&result); */
1090       /* Request_free(&request); */
1091 
1092 #ifdef HAVE_PTHREAD
1093       pthread_mutex_lock(&this->lock);
1094 #endif
1095       if (this->head && this->nprocessed - noutput > output_buffer_size) {
1096 	/* Clear out backlog */
1097 	while (this->head && this->nprocessed - noutput > output_buffer_size) {
1098 #ifdef GSNAP
1099 	  this->head = RRlist_pop(this->head,&fp,&fp_failedinput,&fp_failedinput_1,&fp_failedinput_2);
1100 #else
1101 	  this->head = RRlist_pop(this->head,&fp,&fp_failedinput);
1102 #endif
1103 	  debug1(RRlist_dump(this->head,this->tail));
1104 
1105 #ifdef GSNAP
1106 	  Outbuffer_print_filestrings(fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1107 #else
1108 	  Outbuffer_print_filestrings(fp,fp_failedinput);
1109 #endif
1110 	  noutput += 1;
1111 	  /* Result_free(&result); */
1112 	  /* Request_free(&request); */
1113 	}
1114       }
1115 #ifdef HAVE_PTHREAD
1116       pthread_mutex_unlock(&this->lock);
1117 #endif
1118     }
1119 
1120     debug(fprintf(stderr,"__outbuffer_thread_anyorder has noutput %d, nbeyond %d, ntotal %d\n",
1121 		  noutput,nbeyond,ntotal));
1122 
1123     /* Obtain this->ntotal while locked, to prevent race between output thread and input thread */
1124 #ifdef HAVE_PTHREAD
1125     pthread_mutex_lock(&this->lock);
1126 #endif
1127     ntotal = this->ntotal;
1128     nbeyond = this->nbeyond;
1129 #ifdef HAVE_PTHREAD
1130     pthread_mutex_unlock(&this->lock);
1131 #endif
1132   }
1133 
1134   assert(this->head == NULL);
1135 
1136   return (void *) NULL;
1137 }
1138 
1139 
1140 
1141 void *
Outbuffer_thread_ordered(void * data)1142 Outbuffer_thread_ordered (void *data) {
1143   T this = (T) data;
1144   unsigned int output_buffer_size = this->output_buffer_size;
1145   unsigned int noutput = 0, nqueued = 0, ntotal, nbeyond;
1146   Filestring_T fp;
1147   Filestring_T fp_failedinput;
1148 #ifdef GSNAP
1149   Filestring_T fp_failedinput_1, fp_failedinput_2;
1150 #endif
1151   RRlist_T queue = NULL;
1152   int id;
1153 
1154   /* Obtain this->ntotal while locked, to prevent race between output thread and input thread */
1155 #ifdef HAVE_PTHREAD
1156   pthread_mutex_lock(&this->lock);
1157 #endif
1158   ntotal = this->ntotal;
1159   nbeyond = this->nbeyond;
1160 #ifdef HAVE_PTHREAD
1161   pthread_mutex_unlock(&this->lock);
1162 #endif
1163 
1164   while (noutput + nbeyond < ntotal) {	/* Previously checked against this->ntotal */
1165 #ifdef HAVE_PTHREAD
1166     pthread_mutex_lock(&this->lock);
1167     while (this->head == NULL && noutput + this->nbeyond < this->ntotal) {
1168       pthread_cond_wait(&this->filestring_avail_p,&this->lock);
1169     }
1170     debug(fprintf(stderr,"__outbuffer_thread_ordered woke up\n"));
1171 #endif
1172 
1173     if (this->head == NULL) {
1174 #ifdef HAVE_PTHREAD
1175       /* False wake up, or signal from worker_mpi_process */
1176       ntotal = this->ntotal;
1177       nbeyond = this->nbeyond;
1178       pthread_mutex_unlock(&this->lock);
1179 #endif
1180 
1181     } else {
1182 #ifdef GSNAP
1183       this->head = RRlist_pop(this->head,&fp,&fp_failedinput,&fp_failedinput_1,&fp_failedinput_2);
1184 #else
1185       this->head = RRlist_pop(this->head,&fp,&fp_failedinput);
1186 #endif
1187 
1188 #ifdef HAVE_PTHREAD
1189       /* Allow workers access to the queue */
1190       pthread_mutex_unlock(&this->lock);
1191 #endif
1192       if ((id = Filestring_id(fp)) != (int) noutput) {
1193 	/* Store in queue */
1194 #ifdef GSNAP
1195 	queue = RRlist_insert(queue,id,fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1196 #else
1197 	queue = RRlist_insert(queue,id,fp,fp_failedinput);
1198 #endif
1199 	nqueued++;
1200       } else {
1201 #ifdef GSNAP
1202 	Outbuffer_print_filestrings(fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1203 #else
1204 	Outbuffer_print_filestrings(fp,fp_failedinput);
1205 #endif
1206 	noutput += 1;
1207 
1208 	/* Result_free(&result); */
1209 	/* Request_free(&request); */
1210 
1211 	/* Print out rest of stored queue */
1212 	while (queue != NULL && queue->id == (int) noutput) {
1213 #ifdef GSNAP
1214 	  queue = RRlist_pop_id(queue,&id,&fp,&fp_failedinput,&fp_failedinput_1,&fp_failedinput_2);
1215 #else
1216 	  queue = RRlist_pop_id(queue,&id,&fp,&fp_failedinput);
1217 #endif
1218 	  nqueued--;
1219 #ifdef GSNAP
1220 	  Outbuffer_print_filestrings(fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1221 #else
1222 	  Outbuffer_print_filestrings(fp,fp_failedinput);
1223 #endif
1224 	  noutput += 1;
1225 
1226 	  /* Result_free(&result); */
1227 	  /* Request_free(&request); */
1228 	}
1229       }
1230 
1231 #ifdef HAVE_PTHREAD
1232       pthread_mutex_lock(&this->lock);
1233 #endif
1234       if (this->head && this->nprocessed - nqueued - noutput > output_buffer_size) {
1235 	/* Clear out backlog */
1236 	while (this->head && this->nprocessed - nqueued - noutput > output_buffer_size) {
1237 #ifdef GSNAP
1238 	  this->head = RRlist_pop(this->head,&fp,&fp_failedinput,&fp_failedinput_1,&fp_failedinput_2);
1239 #else
1240 	  this->head = RRlist_pop(this->head,&fp,&fp_failedinput);
1241 #endif
1242 	  if ((id = Filestring_id(fp)) != (int) noutput) {
1243 	    /* Store in queue */
1244 #ifdef GSNAP
1245 	    queue = RRlist_insert(queue,id,fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1246 #else
1247 	    queue = RRlist_insert(queue,id,fp,fp_failedinput);
1248 #endif
1249 	    nqueued++;
1250 	  } else {
1251 #ifdef GSNAP
1252 	    Outbuffer_print_filestrings(fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1253 #else
1254 	    Outbuffer_print_filestrings(fp,fp_failedinput);
1255 #endif
1256 	    noutput += 1;
1257 	    /* Result_free(&result); */
1258 	    /* Request_free(&request); */
1259 
1260 	    /* Print out rest of stored queue */
1261 	    while (queue != NULL && queue->id == (int) noutput) {
1262 #ifdef GSNAP
1263 	      queue = RRlist_pop_id(queue,&id,&fp,&fp_failedinput,&fp_failedinput_1,&fp_failedinput_2);
1264 #else
1265 	      queue = RRlist_pop_id(queue,&id,&fp,&fp_failedinput);
1266 #endif
1267 	      nqueued--;
1268 #ifdef GSNAP
1269 	      Outbuffer_print_filestrings(fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1270 #else
1271 	      Outbuffer_print_filestrings(fp,fp_failedinput);
1272 #endif
1273 	      noutput += 1;
1274 	      /* Result_free(&result); */
1275 	      /* Request_free(&request); */
1276 	    }
1277 	  }
1278 	}
1279       }
1280 
1281 #ifdef HAVE_PTHREAD
1282       pthread_mutex_unlock(&this->lock);
1283 #endif
1284     }
1285 
1286     debug(fprintf(stderr,"__outbuffer_thread_ordered has noutput %d, nbeyond %d, ntotal %d\n",
1287 		  noutput,nbeyond,ntotal));
1288 
1289     /* Obtain this->ntotal while locked, to prevent race between output thread and input thread */
1290 #ifdef HAVE_PTHREAD
1291     pthread_mutex_lock(&this->lock);
1292 #endif
1293     ntotal = this->ntotal;
1294     nbeyond = this->nbeyond;
1295 #ifdef HAVE_PTHREAD
1296     pthread_mutex_unlock(&this->lock);
1297 #endif
1298   }
1299 
1300   assert(queue == NULL);
1301 
1302   return (void *) NULL;
1303 }
1304 
1305