1 static char rcsid[] = "$Id: outbuffer.c 218147 2019-01-16 21:28:41Z twu $";
2 #ifdef HAVE_CONFIG_H
3 #include <config.h>
4 #endif
5
6 #include "outbuffer.h"
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10
11 #ifdef HAVE_PTHREAD
12 #ifdef HAVE_SYS_TYPES_H
13 #include <sys/types.h> /* Needed to define pthread_t on Solaris */
14 #endif
15 #include <pthread.h>
16 #endif
17
18 #include "assert.h"
19 #include "bool.h"
20 #include "mem.h"
21 #include "samheader.h"
22
23
24 /* MPI processing */
25 #ifdef DEBUGM
26 #define debugm(x) x
27 #else
28 #define debugm(x)
29 #endif
30
31 #ifdef DEBUG
32 #define debug(x) x
33 #else
34 #define debug(x)
35 #endif
36
37 #ifdef DEBUG1
38 #define debug1(x) x
39 #else
40 #define debug1(x)
41 #endif
42
43
44 /* sam-to-bam conversions always need the headers */
45 #define SAM_HEADERS_ON_EMPTY_FILES 1
46
47 static int argc;
48 static char **argv;
49 static int optind_save;
50
51 static Univ_IIT_T chromosome_iit;
52 static bool any_circular_p;
53 static int nworkers;
54 static bool orderedp;
55 static bool quiet_if_excessive_p;
56
57 #ifdef GSNAP
58 static Outputtype_T output_type;
59 #else
60 static Printtype_T printtype;
61 static Sequence_T usersegment;
62 #endif
63
64 static bool sam_headers_p;
65 static char *sam_read_group_id;
66 static char *sam_read_group_name;
67 static char *sam_read_group_library;
68 static char *sam_read_group_platform;
69
70 static bool appendp;
71 static char *output_file;
72 static char *split_output_root;
73 static char *failedinput_root;
74
75 #ifdef USE_MPI
76 static MPI_File *outputs;
77 static MPI_File output_failedinput;
78 #ifdef GSNAP
79 static MPI_File output_failedinput_1;
80 static MPI_File output_failedinput_2;
81 #endif
82
83
84 #else
85 static char *write_mode;
86 static FILE **outputs = NULL;
87 static FILE *output_failedinput;
88 #ifdef GSNAP
89 static FILE *output_failedinput_1;
90 static FILE *output_failedinput_2;
91 #endif
92
93 #endif
94
95
96 #ifndef GSNAP
97 /* Taken from Univ_IIT_dump_sam */
98 static void
dump_sam_usersegment(FILE * fp,Sequence_T usersegment,char * sam_read_group_id,char * sam_read_group_name,char * sam_read_group_library,char * sam_read_group_platform)99 dump_sam_usersegment (FILE *fp, Sequence_T usersegment,
100 char *sam_read_group_id, char *sam_read_group_name,
101 char *sam_read_group_library, char *sam_read_group_platform) {
102
103 fprintf(fp,"@SQ\tSN:%s",Sequence_accession(usersegment));
104 fprintf(fp,"\tLN:%u",Sequence_fulllength(usersegment));
105 fprintf(fp,"\n");
106
107 if (sam_read_group_id != NULL) {
108 fprintf(fp,"@RG\tID:%s",sam_read_group_id);
109 if (sam_read_group_platform != NULL) {
110 fprintf(fp,"\tPL:%s",sam_read_group_platform);
111 }
112 if (sam_read_group_library != NULL) {
113 fprintf(fp,"\tLB:%s",sam_read_group_library);
114 }
115 fprintf(fp,"\tSM:%s",sam_read_group_name);
116 fprintf(fp,"\n");
117 }
118
119 return;
120 }
121 #endif
122
123
124 #ifndef GSNAP
125 static void
print_gff_header(FILE * fp,int argc,char ** argv,int optind)126 print_gff_header (FILE *fp, int argc, char **argv, int optind) {
127 char **argstart;
128 int c;
129
130 fprintf(fp,"##gff-version 3\n");
131 fprintf(fp,"# Generated by GMAP version %s using call: ",PACKAGE_VERSION);
132 argstart = &(argv[-optind]);
133 for (c = 0; c < argc + optind; c++) {
134 fprintf(fp," %s",argstart[c]);
135 }
136 fprintf(fp,"\n");
137 return;
138 }
139 #endif
140
141
142 static void
print_file_headers(MPI_File output)143 print_file_headers (
144 #ifdef USE_MPI
145 MPI_File output
146 #else
147 FILE *output
148 #endif
149 ) {
150
151 if (output == NULL) {
152 /* Possible since we are no longer creating a file for OUTPUT_NONE */
153 return;
154 }
155
156 #ifdef GSNAP
157 if (output_type == SAM_OUTPUT && sam_headers_p == true) {
158 SAM_header_print_HD(output,nworkers,orderedp);
159 SAM_header_print_PG(output,argc,argv,optind_save);
160 Univ_IIT_dump_sam(output,chromosome_iit,sam_read_group_id,sam_read_group_name,
161 sam_read_group_library,sam_read_group_platform);
162 }
163
164 #else
165 if (printtype == GFF3_GENE || printtype == GFF3_MATCH_CDNA || printtype == GFF3_MATCH_EST) {
166 print_gff_header(output,argc,argv,optind_save);
167
168 #ifndef PMAP
169 } else if (printtype == SAM && sam_headers_p == true) {
170 if (usersegment != NULL) {
171 dump_sam_usersegment(output,usersegment,sam_read_group_id,sam_read_group_name,
172 sam_read_group_library,sam_read_group_platform);
173 } else {
174 SAM_header_print_HD(output,nworkers,orderedp);
175 SAM_header_print_PG(output,argc,argv,optind_save);
176 Univ_IIT_dump_sam(output,chromosome_iit,sam_read_group_id,sam_read_group_name,
177 sam_read_group_library,sam_read_group_platform);
178 }
179 #endif
180
181 }
182 #endif
183
184 return;
185 }
186
187
188 static void
failedinput_open(char * failedinput_root)189 failedinput_open (char *failedinput_root) {
190 char *filename;
191
192 #ifdef GSNAP
193 filename = (char *) MALLOC((strlen(failedinput_root)+strlen(".1")+1) * sizeof(char));
194 sprintf(filename,"%s.1",failedinput_root);
195
196 #ifdef USE_MPI
197 if (appendp == true) {
198 MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_APPEND,
199 MPI_INFO_NULL,&output_failedinput_1);
200 } else {
201 /* Need to remove existing file, if any */
202 MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_DELETE_ON_CLOSE,
203 MPI_INFO_NULL,&output_failedinput_1);
204 MPI_File_close(&output_failedinput_1);
205 MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY,
206 MPI_INFO_NULL,&output_failedinput_1);
207 }
208 #else
209 if ((output_failedinput_1 = fopen(filename,write_mode)) == NULL) {
210 fprintf(stderr,"Cannot open file %s for writing\n",filename);
211 exit(9);
212 }
213 #endif
214
215 /* Re-use filename, since it is the same length */
216 sprintf(filename,"%s.2",failedinput_root);
217 #ifdef USE_MPI
218 if (appendp == true) {
219 MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_APPEND,
220 MPI_INFO_NULL,&output_failedinput_2);
221 } else {
222 /* Need to remove existing file, if any */
223 MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_DELETE_ON_CLOSE,
224 MPI_INFO_NULL,&output_failedinput_2);
225 MPI_File_close(&output_failedinput_2);
226 MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY,
227 MPI_INFO_NULL,&output_failedinput_2);
228 }
229 #else
230 if ((output_failedinput_2 = fopen(filename,write_mode)) == NULL) {
231 fprintf(stderr,"Cannot open file %s for writing\n",filename);
232 exit(9);
233 }
234 #endif
235
236 /* Re-use filename, since it is shorter */
237 sprintf(filename,"%s",failedinput_root);
238 #ifdef USE_MPI
239 if (appendp == true) {
240 MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_APPEND,
241 MPI_INFO_NULL,&output_failedinput);
242 } else {
243 /* Need to remove existing file, if any */
244 MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_DELETE_ON_CLOSE,
245 MPI_INFO_NULL,&output_failedinput);
246 MPI_File_close(&output_failedinput);
247 MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY,
248 MPI_INFO_NULL,&output_failedinput);
249 }
250 #else
251 if ((output_failedinput = fopen(filename,write_mode)) == NULL) {
252 fprintf(stderr,"Cannot open file %s for writing\n",filename);
253 exit(9);
254 }
255 #endif
256 FREE(filename);
257
258
259 #else /* GMAP */
260 filename = (char *) MALLOC((strlen(failedinput_root)+1) * sizeof(char));
261 sprintf(filename,"%s",failedinput_root);
262 #ifdef USE_MPI
263 if (appendp == true) {
264 MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_APPEND,
265 MPI_INFO_NULL,&output_failedinput);
266 } else {
267 /* Need to remove existing file, if any */
268 MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_DELETE_ON_CLOSE,
269 MPI_INFO_NULL,&output_failedinput);
270 MPI_File_close(&output_failedinput);
271 MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_CREATE | MPI_MODE_WRONLY,
272 MPI_INFO_NULL,&output_failedinput);
273 }
274 #else
275 if ((output_failedinput = fopen(filename,write_mode)) == NULL) {
276 fprintf(stderr,"Cannot open file %s for writing\n",filename);
277 exit(9);
278 }
279 #endif
280 FREE(filename);
281
282 #endif /* GSNAP */
283
284 return;
285 }
286
287
288 void
Outbuffer_setup(int argc_in,char ** argv_in,int optind_in,Univ_IIT_T chromosome_iit_in,bool any_circular_p_in,int nworkers_in,bool orderedp_in,bool quiet_if_excessive_p_in,Outputtype_T output_type_in,bool sam_headers_p_in,char * sam_read_group_id_in,char * sam_read_group_name_in,char * sam_read_group_library_in,char * sam_read_group_platform_in,bool appendp_in,char * output_file_in,char * split_output_root_in,char * failedinput_root_in)289 Outbuffer_setup (int argc_in, char **argv_in, int optind_in,
290 Univ_IIT_T chromosome_iit_in, bool any_circular_p_in,
291 int nworkers_in, bool orderedp_in, bool quiet_if_excessive_p_in,
292 #ifdef GSNAP
293 Outputtype_T output_type_in,
294 #else
295 Printtype_T printtype_in, Sequence_T usersegment_in,
296 #endif
297 bool sam_headers_p_in, char *sam_read_group_id_in, char *sam_read_group_name_in,
298 char *sam_read_group_library_in, char *sam_read_group_platform_in,
299 bool appendp_in, char *output_file_in, char *split_output_root_in, char *failedinput_root_in) {
300 #ifdef USE_MPI
301 SAM_split_output_type split_output;
302 #endif
303
304
305 argc = argc_in;
306 argv = argv_in;
307 optind_save = optind_in;
308
309 chromosome_iit = chromosome_iit_in;
310 any_circular_p = any_circular_p_in;
311
312 nworkers = nworkers_in;
313 orderedp = orderedp_in;
314 quiet_if_excessive_p = quiet_if_excessive_p_in;
315
316 #ifdef GSNAP
317 output_type = output_type_in;
318 #else
319 printtype = printtype_in;
320 usersegment = usersegment_in;
321 #endif
322
323 sam_headers_p = sam_headers_p_in;
324 sam_read_group_id = sam_read_group_id_in;
325 sam_read_group_name = sam_read_group_name_in;
326 sam_read_group_library = sam_read_group_library_in;
327 sam_read_group_platform = sam_read_group_platform_in;
328
329 appendp = appendp_in;
330 split_output_root = split_output_root_in;
331 output_file = output_file_in;
332
333
334 /************************************************************************/
335 /* Output files */
336 /************************************************************************/
337
338 #ifdef USE_MPI
339 /* All processes need to run MPI_File_open, and need to open all files now */
340 outputs = (MPI_File *) CALLOC_KEEP(1+N_SPLIT_OUTPUTS,sizeof(MPI_File));
341 if (split_output_root != NULL) {
342 for (split_output = 1; split_output <= N_SPLIT_OUTPUTS; split_output++) {
343 outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
344 #ifdef SAM_HEADERS_ON_EMPTY_FILES
345 print_file_headers(outputs[split_output]);
346 #endif
347 }
348
349 } else if (output_file != NULL) {
350 outputs[0] = SAM_header_open_file(OUTPUT_FILE,/*split_output_root*/output_file,appendp);
351 #ifdef SAM_HEADERS_ON_EMPTY_FILES
352 print_file_headers(outputs[0]);
353 #endif
354 for (split_output = 1; split_output <= N_SPLIT_OUTPUTS; split_output++) {
355 outputs[split_output] = outputs[0];
356 }
357
358 } else {
359 /* Write to stdout */
360 outputs[0] = (MPI_File) NULL;
361 #ifdef SAM_HEADERS_ON_EMPTY_FILES
362 print_file_headers(outputs[0]);
363 #endif
364 for (split_output = 1; split_output <= N_SPLIT_OUTPUTS; split_output++) {
365 outputs[split_output] = (MPI_File) NULL;
366 }
367 }
368
369 #else
370 /* Only the output thread needs to run fopen, and can open files when needed */
371 if (appendp == true) {
372 write_mode = "a";
373 } else {
374 write_mode = "w";
375 }
376 outputs = (FILE **) CALLOC_KEEP(1+N_SPLIT_OUTPUTS,sizeof(FILE *));
377 #endif
378
379
380 /************************************************************************/
381 /* Failed input files */
382 /************************************************************************/
383
384 failedinput_root = failedinput_root_in;
385 if (failedinput_root == NULL) {
386 output_failedinput = NULL;
387 #ifdef GSNAP
388 output_failedinput_1 = output_failedinput_2 = NULL;
389 #endif
390 } else {
391 failedinput_open(failedinput_root);
392 }
393
394 return;
395 }
396
397
398 void
Outbuffer_cleanup()399 Outbuffer_cleanup () {
400 FREE_KEEP(outputs); /* Matches CALLOC_KEEP in Outbuffer_setup */
401 return;
402 }
403
404
405 typedef struct RRlist_T *RRlist_T;
406 struct RRlist_T {
407 int id;
408 Filestring_T fp;
409 Filestring_T fp_failedinput;
410 #ifdef GSNAP
411 Filestring_T fp_failedinput_1;
412 Filestring_T fp_failedinput_2;
413 #endif
414 RRlist_T next;
415 };
416
417
418 #ifdef DEBUG1
419 static void
RRlist_dump(RRlist_T head,RRlist_T tail)420 RRlist_dump (RRlist_T head, RRlist_T tail) {
421 RRlist_T this;
422
423 printf("head %p\n",head);
424 for (this = head; this != NULL; this = this->next) {
425 printf("%p: next %p\n",this,this->next);
426 }
427 printf("tail %p\n",tail);
428 printf("\n");
429 return;
430 }
431 #endif
432
433
434 /* Returns new tail */
435 static RRlist_T
RRlist_push(RRlist_T * head,RRlist_T tail,Filestring_T fp,Filestring_T fp_failedinput,Filestring_T fp_failedinput_1,Filestring_T fp_failedinput_2)436 RRlist_push (RRlist_T *head, RRlist_T tail, Filestring_T fp,
437 Filestring_T fp_failedinput
438 #ifdef GSNAP
439 , Filestring_T fp_failedinput_1, Filestring_T fp_failedinput_2
440 #endif
441 ) {
442 RRlist_T new;
443
444 new = (RRlist_T) MALLOC_OUT(sizeof(*new)); /* Called by worker thread */
445 new->fp = fp;
446 new->fp_failedinput = fp_failedinput;
447 #ifdef GSNAP
448 new->fp_failedinput_1 = fp_failedinput_1;
449 new->fp_failedinput_2 = fp_failedinput_2;
450 #endif
451 new->next = (RRlist_T) NULL;
452
453 if (*head == NULL) { /* Equivalent to tail == NULL, but using *head avoids having to set tail in RRlist_pop */
454 *head = new;
455 } else {
456 tail->next = new;
457 }
458
459 return new;
460 }
461
462
463 /* Returns new head */
464 static RRlist_T
RRlist_pop(RRlist_T head,Filestring_T * fp,Filestring_T * fp_failedinput,Filestring_T * fp_failedinput_1,Filestring_T * fp_failedinput_2)465 RRlist_pop (RRlist_T head, Filestring_T *fp,
466 Filestring_T *fp_failedinput
467 #ifdef GSNAP
468 , Filestring_T *fp_failedinput_1, Filestring_T *fp_failedinput_2
469 #endif
470 ) {
471 RRlist_T newhead;
472
473 *fp = head->fp;
474 *fp_failedinput = head->fp_failedinput;
475 #ifdef GSNAP
476 *fp_failedinput_1 = head->fp_failedinput_1;
477 *fp_failedinput_2 = head->fp_failedinput_2;
478 #endif
479
480 newhead = head->next;
481
482 FREE_OUT(head); /* Called by outbuffer thread */
483 return newhead;
484 }
485
486
487 static RRlist_T
RRlist_insert(RRlist_T list,int id,Filestring_T fp,Filestring_T fp_failedinput,Filestring_T fp_failedinput_1,Filestring_T fp_failedinput_2)488 RRlist_insert (RRlist_T list, int id, Filestring_T fp,
489 Filestring_T fp_failedinput
490 #ifdef GSNAP
491 , Filestring_T fp_failedinput_1, Filestring_T fp_failedinput_2
492 #endif
493 ) {
494 RRlist_T *p;
495 RRlist_T new;
496
497 p = &list;
498 while (*p != NULL && id > (*p)->id) {
499 p = &(*p)->next;
500 }
501
502 new = (RRlist_T) MALLOC_OUT(sizeof(*new));
503 new->id = id;
504 new->fp = fp;
505 new->fp_failedinput = fp_failedinput;
506 #ifdef GSNAP
507 new->fp_failedinput_1 = fp_failedinput_1;
508 new->fp_failedinput_2 = fp_failedinput_2;
509 #endif
510
511 new->next = *p;
512 *p = new;
513 return list;
514 }
515
516 /* Returns new head */
517 static RRlist_T
RRlist_pop_id(RRlist_T head,int * id,Filestring_T * fp,Filestring_T * fp_failedinput,Filestring_T * fp_failedinput_1,Filestring_T * fp_failedinput_2)518 RRlist_pop_id (RRlist_T head, int *id, Filestring_T *fp,
519 Filestring_T *fp_failedinput
520 #ifdef GSNAP
521 , Filestring_T *fp_failedinput_1, Filestring_T *fp_failedinput_2
522 #endif
523 ) {
524 RRlist_T newhead;
525
526 *id = head->id;
527 *fp = head->fp;
528 *fp_failedinput = head->fp_failedinput;
529 #ifdef GSNAP
530 *fp_failedinput_1 = head->fp_failedinput_1;
531 *fp_failedinput_2 = head->fp_failedinput_2;
532 #endif
533
534 newhead = head->next;
535
536 FREE_OUT(head); /* Called by outbuffer thread */
537 return newhead;
538 }
539
540
541 #define T Outbuffer_T
542 struct T {
543
544 #ifdef HAVE_PTHREAD
545 pthread_mutex_t lock;
546 #endif
547
548 unsigned int output_buffer_size;
549 unsigned int nread;
550 unsigned int ntotal;
551 unsigned int nbeyond; /* MPI request that is beyond the given inputs */
552 unsigned int nprocessed;
553
554 RRlist_T head;
555 RRlist_T tail;
556
557 #ifdef HAVE_PTHREAD
558 pthread_cond_t filestring_avail_p;
559 #endif
560 };
561
562
563 /************************************************************************
564 * File routines
565 ************************************************************************/
566
567
568 T
Outbuffer_new(unsigned int output_buffer_size,unsigned int nread)569 Outbuffer_new (unsigned int output_buffer_size, unsigned int nread) {
570 T new = (T) MALLOC_KEEP(sizeof(*new));
571
572 #ifdef HAVE_PTHREAD
573 pthread_mutex_init(&new->lock,NULL);
574 #endif
575
576 new->output_buffer_size = output_buffer_size;
577 new->nread = nread;
578
579 /* Set to infinity until all reads are input. Used for Pthreads version */
580 new->ntotal = (unsigned int) -1;
581
582 new->nbeyond = 0;
583 new->nprocessed = 0;
584
585 new->head = (RRlist_T) NULL;
586 new->tail = (RRlist_T) NULL;
587
588 #ifdef HAVE_PTHREAD
589 pthread_cond_init(&new->filestring_avail_p,NULL);
590 #endif
591
592 return new;
593 }
594
595
596
597 #ifndef USE_MPI
598 /* Open empty files, and add SAM headers if SAM_HEADERS_ON_EMPTY_FILES is set */
599 static void
touch_all_single_outputs(FILE ** outputs,char * split_output_root,bool appendp)600 touch_all_single_outputs (FILE **outputs, char *split_output_root, bool appendp) {
601 SAM_split_output_type split_output;
602
603 split_output = 1;
604 while (split_output <= N_SPLIT_OUTPUTS_SINGLE_STD) {
605 if (outputs[split_output] == NULL) {
606 outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
607 #ifdef SAM_HEADERS_ON_EMPTY_FILES
608 print_file_headers(outputs[split_output]);
609 #endif
610 }
611 split_output++;
612 }
613
614 if (any_circular_p == false) {
615 split_output = N_SPLIT_OUTPUTS_SINGLE_TOCIRC + 1;
616 } else {
617 while (split_output <= N_SPLIT_OUTPUTS_SINGLE_TOCIRC) {
618 if (outputs[split_output] == NULL) {
619 outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
620 #ifdef SAM_HEADERS_ON_EMPTY_FILES
621 print_file_headers(outputs[split_output]);
622 #endif
623 }
624 split_output++;
625 }
626 }
627
628 if (quiet_if_excessive_p == true) {
629 while (split_output <= N_SPLIT_OUTPUTS_SINGLE) {
630 if (outputs[split_output] == NULL) {
631 outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
632 #ifdef SAM_HEADERS_ON_EMPTY_FILES
633 print_file_headers(outputs[split_output]);
634 #endif
635 }
636 split_output++;
637 }
638 }
639
640 return;
641 }
642 #endif
643
644
645 #ifndef USE_MPI
646 /* Open empty files, and add SAM headers if SAM_HEADERS_ON_EMPTY_FILES is set */
647 static void
touch_all_paired_outputs(FILE ** outputs,char * split_output_root,bool appendp)648 touch_all_paired_outputs (FILE **outputs, char *split_output_root, bool appendp) {
649 SAM_split_output_type split_output;
650
651 split_output = N_SPLIT_OUTPUTS_SINGLE + 1;
652 while (split_output <= N_SPLIT_OUTPUTS_STD) {
653 if (outputs[split_output] == NULL) {
654 outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
655 #ifdef SAM_HEADERS_ON_EMPTY_FILES
656 print_file_headers(outputs[split_output]);
657 #endif
658 }
659 split_output++;
660 }
661
662 if (any_circular_p == false) {
663 split_output = N_SPLIT_OUTPUTS_TOCIRC + 1;
664 } else {
665 while (split_output <= N_SPLIT_OUTPUTS_TOCIRC) {
666 if (outputs[split_output] == NULL) {
667 outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
668 #ifdef SAM_HEADERS_ON_EMPTY_FILES
669 print_file_headers(outputs[split_output]);
670 #endif
671 }
672 split_output++;
673 }
674 }
675
676 if (quiet_if_excessive_p == true) {
677 while (split_output <= N_SPLIT_OUTPUTS) {
678 if (outputs[split_output] == NULL) {
679 outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
680 #ifdef SAM_HEADERS_ON_EMPTY_FILES
681 print_file_headers(outputs[split_output]);
682 #endif
683 }
684 split_output++;
685 }
686 }
687
688 return;
689 }
690 #endif
691
692
693 #ifndef USE_MPI
694 static bool
paired_outputs_p(FILE ** outputs)695 paired_outputs_p (FILE **outputs) {
696 SAM_split_output_type split_output;
697
698 split_output = N_SPLIT_OUTPUTS_SINGLE + 1;
699 while (split_output <= N_SPLIT_OUTPUTS) {
700 if (outputs[split_output] != NULL) {
701 return true;
702 }
703 split_output++;
704 }
705
706 return false;
707 }
708 #endif
709
710
711 #ifndef USE_MPI
712 static void
touch_all_files(FILE ** outputs,char * split_output_root,bool appendp)713 touch_all_files (FILE **outputs, char *split_output_root, bool appendp) {
714 touch_all_single_outputs(outputs,split_output_root,appendp);
715 if (paired_outputs_p(outputs) == true) {
716 touch_all_paired_outputs(outputs,split_output_root,appendp);
717 }
718 return;
719 }
720 #endif
721
722
723
724 void
Outbuffer_close_files()725 Outbuffer_close_files () {
726 SAM_split_output_type split_output;
727
728 if (failedinput_root != NULL) {
729 #ifdef USE_MPI
730 MPI_File_close(&output_failedinput);
731 #ifdef GSNAP
732 MPI_File_close(&output_failedinput_1);
733 MPI_File_close(&output_failedinput_2);
734 #endif
735
736 #else
737 fclose(output_failedinput);
738 #ifdef GSNAP
739 fclose(output_failedinput_1);
740 fclose(output_failedinput_2);
741 #endif
742 #endif
743
744 }
745
746 #ifdef USE_MPI
747 if (split_output_root != NULL) {
748 for (split_output = 1; split_output <= N_SPLIT_OUTPUTS; split_output++) {
749 MPI_File_close(&(outputs[split_output]));
750 }
751 } else if (output_file != NULL) {
752 MPI_File_close(&(outputs[0]));
753 } else {
754 /* Wrote to stdout */
755 }
756
757 #else
758 if (split_output_root != NULL) {
759 touch_all_files(outputs,split_output_root,appendp);
760
761 for (split_output = 1; split_output <= N_SPLIT_OUTPUTS; split_output++) {
762 if (outputs[split_output] != NULL) {
763 fclose(outputs[split_output]);
764 }
765 }
766 } else if (output_file != NULL) {
767 fclose(outputs[0]);
768 } else {
769 /* Wrote to stdout */
770 }
771 #endif
772
773 FREE_KEEP(outputs);
774
775 return;
776 }
777
778
779 void
Outbuffer_free(T * old)780 Outbuffer_free (T *old) {
781
782 if (*old) {
783 #ifdef HAVE_PTHREAD
784 pthread_cond_destroy(&(*old)->filestring_avail_p);
785 pthread_mutex_destroy(&(*old)->lock);
786 #endif
787
788 FREE_KEEP(*old);
789 }
790
791 return;
792 }
793
794
795 unsigned int
Outbuffer_nread(T this)796 Outbuffer_nread (T this) {
797 return this->nread;
798 }
799
800 unsigned int
Outbuffer_nbeyond(T this)801 Outbuffer_nbeyond (T this) {
802 return this->nbeyond;
803 }
804
805
806 void
Outbuffer_add_nread(T this,unsigned int nread)807 Outbuffer_add_nread (T this, unsigned int nread) {
808
809 #ifdef HAVE_PTHREAD
810 pthread_mutex_lock(&this->lock);
811 #endif
812
813 if (nread == 0) {
814 /* Finished reading, so able to determine total reads in input */
815 this->ntotal = this->nread;
816 debug(fprintf(stderr,"__Outbuffer_add_nread added 0 reads, so setting ntotal to be %u\n",this->ntotal));
817
818 #ifdef HAVE_PTHREAD
819 pthread_cond_signal(&this->filestring_avail_p);
820 #endif
821
822 } else {
823 this->nread += nread;
824 #ifdef USE_MPI
825 this->ntotal = this->nread;
826 #endif
827 debug(fprintf(stderr,"__Outbuffer_add_nread added %d read, now %d\n",nread,this->nread));
828 }
829
830 #ifdef HAVE_PTHREAD
831 pthread_mutex_unlock(&this->lock);
832 #endif
833
834 return;
835 }
836
837
838 void
Outbuffer_add_nbeyond(T this)839 Outbuffer_add_nbeyond (T this) {
840
841 #ifdef HAVE_PTHREAD
842 pthread_mutex_lock(&this->lock);
843 #endif
844
845 this->nbeyond += 1;
846
847 #ifdef HAVE_PTHREAD
848 pthread_cond_signal(&this->filestring_avail_p);
849 pthread_mutex_unlock(&this->lock);
850 #endif
851
852 return;
853 }
854
855
856 #ifdef GSNAP
857 void
Outbuffer_put_filestrings(T this,Filestring_T fp,Filestring_T fp_failedinput,Filestring_T fp_failedinput_1,Filestring_T fp_failedinput_2)858 Outbuffer_put_filestrings (T this, Filestring_T fp, Filestring_T fp_failedinput, Filestring_T fp_failedinput_1, Filestring_T fp_failedinput_2) {
859
860 #ifdef HAVE_PTHREAD
861 pthread_mutex_lock(&this->lock);
862 #endif
863
864 this->tail = RRlist_push(&this->head,this->tail,fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
865 debug1(RRlist_dump(this->head,this->tail));
866 this->nprocessed += 1;
867
868 #ifdef HAVE_PTHREAD
869 debug(printf("Signaling that filestring is available\n"));
870 pthread_cond_signal(&this->filestring_avail_p);
871 pthread_mutex_unlock(&this->lock);
872 #endif
873
874 return;
875 }
876
877 #else
878 void
Outbuffer_put_filestrings(T this,Filestring_T fp,Filestring_T fp_failedinput)879 Outbuffer_put_filestrings (T this, Filestring_T fp, Filestring_T fp_failedinput) {
880
881 #ifdef HAVE_PTHREAD
882 pthread_mutex_lock(&this->lock);
883 #endif
884
885 this->tail = RRlist_push(&this->head,this->tail,fp,fp_failedinput);
886 debug1(RRlist_dump(this->head,this->tail));
887 this->nprocessed += 1;
888
889 #ifdef HAVE_PTHREAD
890 debug(printf("Signaling that filestring is available\n"));
891 pthread_cond_signal(&this->filestring_avail_p);
892 pthread_mutex_unlock(&this->lock);
893 #endif
894
895 return;
896 }
897 #endif
898
899
900
901 #ifdef GSNAP
902 void
Outbuffer_print_filestrings(Filestring_T fp,Filestring_T fp_failedinput,Filestring_T fp_failedinput_1,Filestring_T fp_failedinput_2)903 Outbuffer_print_filestrings (Filestring_T fp, Filestring_T fp_failedinput, Filestring_T fp_failedinput_1, Filestring_T fp_failedinput_2) {
904 SAM_split_output_type split_output;
905 #ifdef USE_MPI
906 MPI_File output;
907 #else
908 FILE *output;
909 #endif
910
911 #ifdef USE_MPI
912 split_output = Filestring_split_output(fp);
913 output = outputs[split_output];
914
915 #else
916 if (split_output_root != NULL) {
917 split_output = Filestring_split_output(fp);
918 if ((output = outputs[split_output]) == NULL) {
919 output = outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
920 if (split_output == OUTPUT_NONE) {
921 /* Don't print file headers, since no output will go to
922 stdout. Must be a nomapping when --nofails is specified */
923 } else {
924 print_file_headers(output);
925 }
926 }
927 } else if ((output = outputs[0]) == NULL) {
928 if (output_file == NULL) {
929 output = outputs[0] = stdout;
930 print_file_headers(stdout);
931 } else {
932 output = outputs[0] = SAM_header_open_file(/*split_output*/OUTPUT_FILE,output_file,appendp);
933 print_file_headers(output);
934 }
935 }
936 #endif
937
938 #ifdef USE_MPI
939 /* Prevents output from being broken up */
940 Filestring_stringify(fp);
941 #endif
942 Filestring_print(output,fp);
943 Filestring_free(&fp);
944
945 if (failedinput_root != NULL) {
946 if (fp_failedinput != NULL) {
947 #ifdef USE_MPI
948 Filestring_stringify(fp_failedinput);
949 #endif
950 Filestring_print(output_failedinput,fp_failedinput);
951 Filestring_free(&fp_failedinput);
952 }
953 if (fp_failedinput_1 != NULL) {
954 #ifdef USE_MPI
955 Filestring_stringify(fp_failedinput_1);
956 #endif
957 Filestring_print(output_failedinput_1,fp_failedinput_1);
958 Filestring_free(&fp_failedinput_1);
959 }
960 if (fp_failedinput_2 != NULL) {
961 #ifdef USE_MPI
962 Filestring_stringify(fp_failedinput_2);
963 #endif
964 Filestring_print(output_failedinput_2,fp_failedinput_2);
965 Filestring_free(&fp_failedinput_2);
966 }
967 }
968
969 return;
970 }
971
972 #else
973 void
Outbuffer_print_filestrings(Filestring_T fp,Filestring_T fp_failedinput)974 Outbuffer_print_filestrings (Filestring_T fp, Filestring_T fp_failedinput) {
975 SAM_split_output_type split_output;
976 #ifdef USE_MPI
977 MPI_File output;
978 #else
979 FILE *output;
980 #endif
981
982 #ifdef USE_MPI
983 split_output = Filestring_split_output(fp);
984 output = outputs[split_output];
985
986 #else
987 if (split_output_root != NULL) {
988 split_output = Filestring_split_output(fp);
989 if ((output = outputs[split_output]) == NULL) {
990 output = outputs[split_output] = SAM_header_open_file(split_output,split_output_root,appendp);
991 if (split_output == OUTPUT_NONE && split_output_root != NULL) {
992 /* Don't print file headers, since no output will go to
993 stdout. Must be a nomapping when --nofails is specified */
994 } else {
995 print_file_headers(output);
996 }
997 }
998
999 } else if ((output = outputs[0]) == NULL) {
1000 if (output_file == NULL) {
1001 output = outputs[0] = stdout;
1002 print_file_headers(stdout);
1003 } else {
1004 output = outputs[0] = SAM_header_open_file(/*split_output*/OUTPUT_FILE,output_file,appendp);
1005 print_file_headers(output);
1006 }
1007 }
1008 #endif
1009
1010 #ifdef USE_MPI
1011 Filestring_stringify(fp);
1012 #endif
1013 Filestring_print(output,fp);
1014 Filestring_free(&fp);
1015
1016 if (failedinput_root != NULL) {
1017 if (fp_failedinput != NULL) {
1018 #ifdef USE_MPI
1019 Filestring_stringify(fp_failedinput);
1020 #endif
1021 Filestring_print(output_failedinput,fp_failedinput);
1022 Filestring_free(&fp_failedinput);
1023 }
1024 }
1025
1026 return;
1027 }
1028
1029 #endif
1030
1031
1032
1033 void *
Outbuffer_thread_anyorder(void * data)1034 Outbuffer_thread_anyorder (void *data) {
1035 T this = (T) data;
1036 unsigned int output_buffer_size = this->output_buffer_size;
1037 unsigned int noutput = 0, ntotal, nbeyond;
1038 Filestring_T fp;
1039 Filestring_T fp_failedinput;
1040 #ifdef GSNAP
1041 Filestring_T fp_failedinput_1, fp_failedinput_2;
1042 #endif
1043
1044 /* Obtain this->ntotal while locked, to prevent race between output thread and input thread */
1045 #ifdef HAVE_PTHREAD
1046 pthread_mutex_lock(&this->lock);
1047 #endif
1048 ntotal = this->ntotal;
1049 nbeyond = this->nbeyond;
1050 #ifdef HAVE_PTHREAD
1051 pthread_mutex_unlock(&this->lock);
1052 #endif
1053
1054 while (noutput + nbeyond < ntotal) { /* Previously check against this->ntotal */
1055 #ifdef HAVE_PTHREAD
1056 pthread_mutex_lock(&this->lock);
1057 while (this->head == NULL && noutput + this->nbeyond < this->ntotal) {
1058 debug(fprintf(stderr,"__outbuffer_thread_anyorder waiting for filestring_avail_p\n"));
1059 pthread_cond_wait(&this->filestring_avail_p,&this->lock);
1060 }
1061 debug(fprintf(stderr,"__outbuffer_thread_anyorder woke up\n"));
1062 #endif
1063
1064 if (this->head == NULL) {
1065 /* False wake up */
1066 #ifdef HAVE_PTHREAD
1067 pthread_mutex_unlock(&this->lock);
1068 #endif
1069
1070 } else {
1071 #ifdef GSNAP
1072 this->head = RRlist_pop(this->head,&fp,&fp_failedinput,&fp_failedinput_1,&fp_failedinput_2);
1073 #else
1074 this->head = RRlist_pop(this->head,&fp,&fp_failedinput);
1075 #endif
1076 debug1(RRlist_dump(this->head,this->tail));
1077
1078 #ifdef HAVE_PTHREAD
1079 /* Let worker threads put filestrings while we print */
1080 pthread_mutex_unlock(&this->lock);
1081 #endif
1082
1083 #ifdef GSNAP
1084 Outbuffer_print_filestrings(fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1085 #else
1086 Outbuffer_print_filestrings(fp,fp_failedinput);
1087 #endif
1088 noutput += 1;
1089 /* Result_free(&result); */
1090 /* Request_free(&request); */
1091
1092 #ifdef HAVE_PTHREAD
1093 pthread_mutex_lock(&this->lock);
1094 #endif
1095 if (this->head && this->nprocessed - noutput > output_buffer_size) {
1096 /* Clear out backlog */
1097 while (this->head && this->nprocessed - noutput > output_buffer_size) {
1098 #ifdef GSNAP
1099 this->head = RRlist_pop(this->head,&fp,&fp_failedinput,&fp_failedinput_1,&fp_failedinput_2);
1100 #else
1101 this->head = RRlist_pop(this->head,&fp,&fp_failedinput);
1102 #endif
1103 debug1(RRlist_dump(this->head,this->tail));
1104
1105 #ifdef GSNAP
1106 Outbuffer_print_filestrings(fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1107 #else
1108 Outbuffer_print_filestrings(fp,fp_failedinput);
1109 #endif
1110 noutput += 1;
1111 /* Result_free(&result); */
1112 /* Request_free(&request); */
1113 }
1114 }
1115 #ifdef HAVE_PTHREAD
1116 pthread_mutex_unlock(&this->lock);
1117 #endif
1118 }
1119
1120 debug(fprintf(stderr,"__outbuffer_thread_anyorder has noutput %d, nbeyond %d, ntotal %d\n",
1121 noutput,nbeyond,ntotal));
1122
1123 /* Obtain this->ntotal while locked, to prevent race between output thread and input thread */
1124 #ifdef HAVE_PTHREAD
1125 pthread_mutex_lock(&this->lock);
1126 #endif
1127 ntotal = this->ntotal;
1128 nbeyond = this->nbeyond;
1129 #ifdef HAVE_PTHREAD
1130 pthread_mutex_unlock(&this->lock);
1131 #endif
1132 }
1133
1134 assert(this->head == NULL);
1135
1136 return (void *) NULL;
1137 }
1138
1139
1140
1141 void *
Outbuffer_thread_ordered(void * data)1142 Outbuffer_thread_ordered (void *data) {
1143 T this = (T) data;
1144 unsigned int output_buffer_size = this->output_buffer_size;
1145 unsigned int noutput = 0, nqueued = 0, ntotal, nbeyond;
1146 Filestring_T fp;
1147 Filestring_T fp_failedinput;
1148 #ifdef GSNAP
1149 Filestring_T fp_failedinput_1, fp_failedinput_2;
1150 #endif
1151 RRlist_T queue = NULL;
1152 int id;
1153
1154 /* Obtain this->ntotal while locked, to prevent race between output thread and input thread */
1155 #ifdef HAVE_PTHREAD
1156 pthread_mutex_lock(&this->lock);
1157 #endif
1158 ntotal = this->ntotal;
1159 nbeyond = this->nbeyond;
1160 #ifdef HAVE_PTHREAD
1161 pthread_mutex_unlock(&this->lock);
1162 #endif
1163
1164 while (noutput + nbeyond < ntotal) { /* Previously checked against this->ntotal */
1165 #ifdef HAVE_PTHREAD
1166 pthread_mutex_lock(&this->lock);
1167 while (this->head == NULL && noutput + this->nbeyond < this->ntotal) {
1168 pthread_cond_wait(&this->filestring_avail_p,&this->lock);
1169 }
1170 debug(fprintf(stderr,"__outbuffer_thread_ordered woke up\n"));
1171 #endif
1172
1173 if (this->head == NULL) {
1174 #ifdef HAVE_PTHREAD
1175 /* False wake up, or signal from worker_mpi_process */
1176 ntotal = this->ntotal;
1177 nbeyond = this->nbeyond;
1178 pthread_mutex_unlock(&this->lock);
1179 #endif
1180
1181 } else {
1182 #ifdef GSNAP
1183 this->head = RRlist_pop(this->head,&fp,&fp_failedinput,&fp_failedinput_1,&fp_failedinput_2);
1184 #else
1185 this->head = RRlist_pop(this->head,&fp,&fp_failedinput);
1186 #endif
1187
1188 #ifdef HAVE_PTHREAD
1189 /* Allow workers access to the queue */
1190 pthread_mutex_unlock(&this->lock);
1191 #endif
1192 if ((id = Filestring_id(fp)) != (int) noutput) {
1193 /* Store in queue */
1194 #ifdef GSNAP
1195 queue = RRlist_insert(queue,id,fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1196 #else
1197 queue = RRlist_insert(queue,id,fp,fp_failedinput);
1198 #endif
1199 nqueued++;
1200 } else {
1201 #ifdef GSNAP
1202 Outbuffer_print_filestrings(fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1203 #else
1204 Outbuffer_print_filestrings(fp,fp_failedinput);
1205 #endif
1206 noutput += 1;
1207
1208 /* Result_free(&result); */
1209 /* Request_free(&request); */
1210
1211 /* Print out rest of stored queue */
1212 while (queue != NULL && queue->id == (int) noutput) {
1213 #ifdef GSNAP
1214 queue = RRlist_pop_id(queue,&id,&fp,&fp_failedinput,&fp_failedinput_1,&fp_failedinput_2);
1215 #else
1216 queue = RRlist_pop_id(queue,&id,&fp,&fp_failedinput);
1217 #endif
1218 nqueued--;
1219 #ifdef GSNAP
1220 Outbuffer_print_filestrings(fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1221 #else
1222 Outbuffer_print_filestrings(fp,fp_failedinput);
1223 #endif
1224 noutput += 1;
1225
1226 /* Result_free(&result); */
1227 /* Request_free(&request); */
1228 }
1229 }
1230
1231 #ifdef HAVE_PTHREAD
1232 pthread_mutex_lock(&this->lock);
1233 #endif
1234 if (this->head && this->nprocessed - nqueued - noutput > output_buffer_size) {
1235 /* Clear out backlog */
1236 while (this->head && this->nprocessed - nqueued - noutput > output_buffer_size) {
1237 #ifdef GSNAP
1238 this->head = RRlist_pop(this->head,&fp,&fp_failedinput,&fp_failedinput_1,&fp_failedinput_2);
1239 #else
1240 this->head = RRlist_pop(this->head,&fp,&fp_failedinput);
1241 #endif
1242 if ((id = Filestring_id(fp)) != (int) noutput) {
1243 /* Store in queue */
1244 #ifdef GSNAP
1245 queue = RRlist_insert(queue,id,fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1246 #else
1247 queue = RRlist_insert(queue,id,fp,fp_failedinput);
1248 #endif
1249 nqueued++;
1250 } else {
1251 #ifdef GSNAP
1252 Outbuffer_print_filestrings(fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1253 #else
1254 Outbuffer_print_filestrings(fp,fp_failedinput);
1255 #endif
1256 noutput += 1;
1257 /* Result_free(&result); */
1258 /* Request_free(&request); */
1259
1260 /* Print out rest of stored queue */
1261 while (queue != NULL && queue->id == (int) noutput) {
1262 #ifdef GSNAP
1263 queue = RRlist_pop_id(queue,&id,&fp,&fp_failedinput,&fp_failedinput_1,&fp_failedinput_2);
1264 #else
1265 queue = RRlist_pop_id(queue,&id,&fp,&fp_failedinput);
1266 #endif
1267 nqueued--;
1268 #ifdef GSNAP
1269 Outbuffer_print_filestrings(fp,fp_failedinput,fp_failedinput_1,fp_failedinput_2);
1270 #else
1271 Outbuffer_print_filestrings(fp,fp_failedinput);
1272 #endif
1273 noutput += 1;
1274 /* Result_free(&result); */
1275 /* Request_free(&request); */
1276 }
1277 }
1278 }
1279 }
1280
1281 #ifdef HAVE_PTHREAD
1282 pthread_mutex_unlock(&this->lock);
1283 #endif
1284 }
1285
1286 debug(fprintf(stderr,"__outbuffer_thread_ordered has noutput %d, nbeyond %d, ntotal %d\n",
1287 noutput,nbeyond,ntotal));
1288
1289 /* Obtain this->ntotal while locked, to prevent race between output thread and input thread */
1290 #ifdef HAVE_PTHREAD
1291 pthread_mutex_lock(&this->lock);
1292 #endif
1293 ntotal = this->ntotal;
1294 nbeyond = this->nbeyond;
1295 #ifdef HAVE_PTHREAD
1296 pthread_mutex_unlock(&this->lock);
1297 #endif
1298 }
1299
1300 assert(queue == NULL);
1301
1302 return (void *) NULL;
1303 }
1304
1305