1 //#define GFF_DEBUG 1 //debugging guides loading
2 #include "rlink.h"
3 #include "tmerge.h"
4 #ifndef NOTHREADS
5 #include "GThreads.h"
6 #endif
7
8 //#define GMEMTRACE 1 //debugging mem allocation
9
10 #ifdef GMEMTRACE
11 #include "proc_mem.h"
12 #endif
13
14 #define VERSION "2.1.1"
15
16 //#define DEBUGPRINT 1
17
18 #ifdef DEBUGPRINT
19 #define DBGPRINT(x) GMessage(x)
20 #define DBGPRINT2(a,b) GMessage(a,b)
21 #define DBGPRINT3(a,b,c) GMessage(a,b,c)
22 #define DBGPRINT4(a,b,c,d) GMessage(a,b,c,d)
23 #define DBGPRINT5(a,b,c,d,e) GMessage(a,b,c,d,e)
24 #else
25 #define DBGPRINT(x)
26 #define DBGPRINT2(a,b)
27 #define DBGPRINT3(a,b,c)
28 #define DBGPRINT4(a,b,c,d)
29 #define DBGPRINT5(a,b,c,d,e)
30 #endif
31
32 #define USAGE "StringTie v" VERSION " usage:\n\
33 stringtie <input.bam ..> [-G <guide_gff>] [-l <label>] [-o <out_gtf>] [-p <cpus>]\n\
34 [-v] [-a <min_anchor_len>] [-m <min_tlen>] [-j <min_anchor_cov>] [-f <min_iso>]\n\
35 [-C <coverage_file_name>] [-c <min_bundle_cov>] [-g <bdist>] [-u] [-L]\n\
36 [-e] [-x <seqid,..>] [-A <gene_abund.out>] [-h] {-B | -b <dir_path>} \n\
37 Assemble RNA-Seq alignments into potential transcripts.\n\
38 Options:\n\
39 --version : print just the version at stdout and exit\n\
40 --conservative : conservative transcriptome assembly, same as -t -c 1.5 -f 0.05\n\
41 --rf assume stranded library fr-firststrand\n\
42 --fr assume stranded library fr-secondstrand\n\
43 -G reference annotation to use for guiding the assembly process (GTF/GFF3)\n\
44 -o output path/file name for the assembled transcripts GTF (default: stdout)\n\
45 -l name prefix for output transcripts (default: STRG)\n\
46 -f minimum isoform fraction (default: 0.01)\n\
47 -L use long reads settings (default:false)\n\
48 -R if long reads are provided, just clean and collapse the reads but do not assemble\n\
49 -m minimum assembled transcript length (default: 200)\n\
50 -a minimum anchor length for junctions (default: 10)\n\
51 -j minimum junction coverage (default: 1)\n\
52 -t disable trimming of predicted transcripts based on coverage\n\
53 (default: coverage trimming is enabled)\n\
54 -c minimum reads per bp coverage to consider for multi-exon transcript\n\
55 (default: 1)\n\
56 -s minimum reads per bp coverage to consider for single-exon transcript\n\
57 (default: 4.75)\n\
58 -v verbose (log bundle processing details)\n\
59 -g maximum gap allowed between read mappings (default: 50)\n\
60 -M fraction of bundle allowed to be covered by multi-hit reads (default:1)\n\
61 -p number of threads (CPUs) to use (default: 1)\n\
62 -A gene abundance estimation output file\n\
63 -B enable output of Ballgown table files which will be created in the\n\
64 same directory as the output GTF (requires -G, -o recommended)\n\
65 -b enable output of Ballgown table files but these files will be \n\
66 created under the directory path given as <dir_path>\n\
67 -e only estimate the abundance of given reference transcripts (requires -G)\n\
68 -x do not assemble any transcripts on the given reference sequence(s)\n\
69 -u no multi-mapping correction (default: correction enabled)\n\
70 -h print this usage message and exit\n\
71 \n\
72 Transcript merge usage mode: \n\
73 stringtie --merge [Options] { gtf_list | strg1.gtf ...}\n\
74 With this option StringTie will assemble transcripts from multiple\n\
75 input files generating a unified non-redundant set of isoforms. In this mode\n\
76 the following options are available:\n\
77 -G <guide_gff> reference annotation to include in the merging (GTF/GFF3)\n\
78 -o <out_gtf> output file name for the merged transcripts GTF\n\
79 (default: stdout)\n\
80 -m <min_len> minimum input transcript length to include in the merge\n\
81 (default: 50)\n\
82 -c <min_cov> minimum input transcript coverage to include in the merge\n\
83 (default: 0)\n\
84 -F <min_fpkm> minimum input transcript FPKM to include in the merge\n\
85 (default: 1.0)\n\
86 -T <min_tpm> minimum input transcript TPM to include in the merge\n\
87 (default: 1.0)\n\
88 -f <min_iso> minimum isoform fraction (default: 0.01)\n\
89 -g <gap_len> gap between transcripts to merge together (default: 250)\n\
90 -i keep merged transcripts with retained introns; by default\n\
91 these are not kept unless there is strong evidence for them\n\
92 -l <label> name prefix for output transcripts (default: MSTRG)\n\
93 "
94 /*
95 -C output a file with reference transcripts that are covered by reads\n\
96 -U unitigs are treated as reads and not as guides \n\ \\ not used now
97 -d disable adaptive read coverage mode (default: yes)\n\
98 -n sensitivity level: 0,1, or 2, 3, with 3 the most sensitive level (default 1)\n\ \\ deprecated for now
99 -O disable the coverage saturation limit and use a slower two-pass approach\n\
100 to process the input alignments, collapsing redundant reads\n\
101 -i the reference annotation contains partial transcripts\n\
102 -w weight the maximum flow algorithm towards the transcript with higher rate (abundance); default: no\n\
103 -y include EM algorithm in max flow estimation; default: no\n\
104 -z don't include source in the max flow algorithm\n\
105 -P output file with all transcripts in reference that are partially covered by reads
106 -M fraction of bundle allowed to be covered by multi-hit reads (paper uses default: 1)\n\
107 -c minimum bundle reads per bp coverage to consider for assembly (paper uses default: 3)\n\
108 -S more sensitive run (default: no) disabled for now \n\
109 -s coverage saturation threshold; further read alignments will be\n\ // this coverage saturation parameter is deprecated starting at version 1.0.5
110 ignored in a region where a local coverage depth of <maxcov> \n\
111 is reached (default: 1,000,000);\n\ \\ deprecated
112 -e (mergeMode) include estimated coverage information in the preidcted transcript\n\
113 -E (mergeMode) enable the name of the input transcripts to be included\n\
114 in the merge output (default: no)\n\
115 */
116 //---- globals
117
118 FILE* f_out=NULL;
119 FILE* c_out=NULL;
120 //#define B_DEBUG 1
121 #ifdef B_DEBUG
122 FILE* dbg_out=NULL;
123 #endif
124
125 GStr outfname;
126 GStr out_dir;
127 GStr tmp_path;
128 GStr tmpfname;
129 GStr genefname;
130 bool guided=false;
131 bool trim=true;
132 bool eonly=false; // parameter -e ; for mergeMode includes estimated coverage sum in the merged transcripts
133 bool longreads=false;
134 bool rawreads=false;
135 bool nomulti=false;
136 bool enableNames=false;
137 bool includecov=false;
138 bool fr_strand=false;
139 bool rf_strand=false;
140 //bool complete=true; // set by parameter -i the reference annotation contains partial transcripts
141 bool retained_intron=false; // set by parameter -i for merge option
142 bool geneabundance=false;
143 //bool partialcov=false;
144 int num_cpus=1;
145 int mintranscriptlen=200; // minimum length for a transcript to be printed
146 //int sensitivitylevel=1;
147 uint junctionsupport=10; // anchor length for junction to be considered well supported <- consider shorter??
148 int junctionthr=1; // number of reads needed to support a particular junction
149 float readthr=1; // read coverage per bundle bp to accept it; // paper uses 3
150 float singlethr=4.75;
151 uint bundledist=50; // reads at what distance should be considered part of separate bundles
152 uint runoffdist=200;
153 float mcov=1; // fraction of bundle allowed to be covered by multi-hit reads paper uses 1
154 int allowed_nodes=1000;
155 //bool adaptive=true; // adaptive read coverage -> depends on the overall gene coverage
156
157 int no_xs=0; // number of records without the xs tag
158
159 float fpkm_thr=1;
160 float tpm_thr=1;
161
162 // different options of implementation reflected with the next three options
163 bool includesource=true;
164 //bool EM=false;
165 //bool weight=false;
166
167 float isofrac=0.01;
168 bool isunitig=true;
169 GStr label("STRG");
170 GStr ballgown_dir;
171
172 GFastaDb* gfasta=NULL;
173
174 GStr guidegff;
175
176 bool debugMode=false;
177 bool verbose=false;
178 bool ballgown=false;
179
180 //int maxReadCov=1000000; //max local read coverage (changed with -s option)
181 //no more reads will be considered for a bundle if the local coverage exceeds this value
182 //(each exon is checked for this)
183
184 bool forceBAM = false; //useful for stdin (piping alignments into StringTie)
185
186 bool mergeMode = false; //--merge option
187 bool keepTempFiles = false; //--keeptmp
188
189 int GeneNo=0; //-- global "gene" counter
190 double Num_Fragments=0; //global fragment counter (aligned pairs)
191 double Frag_Len=0;
192 double Cov_Sum=0;
193 //bool firstPrint=true; //just for writing the GFF header before the first transcript is printed
194
195 GffNames* gseqNames=NULL; //used as a dictionary for genomic sequence names and ids
196
197 int refseqCount=0;
198
199
200 #ifdef GMEMTRACE
201 double maxMemRS=0;
202 double maxMemVM=0;
203 GStr maxMemBundle;
204 #endif
205
206
207 #ifndef NOTHREADS
208 //single producer, multiple consumers
209 //main thread/program is always loading the producer
210 GMutex dataMutex; //manage availability of data records ready to be loaded by main thread
211 GVec<int> dataClear; //indexes of data bundles cleared for loading by main thread (clear data pool)
212 GConditionVar haveBundles; //will notify a thread that a bundle was loaded in the ready queue
213 //(or that no more bundles are coming)
214 int bundleWork=1; // bit 0 set if bundles are still being prepared (BAM file not exhausted yet)
215 // bit 1 set if there are Bundles ready in the queue
216
217 //GFastMutex waitMutex;
218 GMutex waitMutex; // controls threadsWaiting (idle threads counter)
219
220 int threadsWaiting; // idle worker threads
221 GConditionVar haveThreads; //will notify the bundle loader when a thread
222 //is available to process the currently loaded bundle
223
224 GConditionVar haveClear; //will notify when bundle buf space available
225
226 GMutex queueMutex; //controls bundleQueue and bundles access
227
228 GFastMutex printMutex; //for writing the output to file
229
230 GFastMutex logMutex; //only when verbose - to avoid mangling the log output
231
232 GFastMutex bamReadingMutex;
233
234 GFastMutex countMutex;
235
236 #endif
237
238 GHash<int> excludeGseqs; //hash of chromosomes/contigs to exclude (e.g. chrM)
239
240 bool NoMoreBundles=false;
241 bool moreBundles(); //thread-safe retrieves NoMoreBundles
242 void noMoreBundles(); //sets NoMoreBundles to true
243 //--
244 void processOptions(GArgs& args);
245 char* sprintTime();
246
247 void processBundle(BundleData* bundle);
248 //void processBundle1stPass(BundleData* bundle); //two-pass testing
249
250 void writeUnbundledGuides(GVec<GRefData>& refdata, FILE* fout, FILE* gout=NULL);
251
252 #ifndef NOTHREADS
253
254 bool noThreadsWaiting();
255
256 void workerThread(GThreadData& td); // Thread function
257
258 //prepare the next free bundle for loading
259 int waitForData(BundleData* bundles);
260 #endif
261
262
263 TInputFiles bamreader;
264
main(int argc,char * argv[])265 int main(int argc, char* argv[]) {
266
267 // == Process arguments.
268 GArgs args(argc, argv,
269 "debug;help;version;conservative;keeptmp;rseq=;bam;fr;rf;merge;exclude=zEihvteuLRx:n:j:s:D:G:C:S:l:m:o:a:j:c:f:p:g:P:M:Bb:A:F:T:");
270 args.printError(USAGE, true);
271
272 processOptions(args);
273
274 GVec<GRefData> refguides; // plain vector with transcripts for each chromosome
275
276 //table indexes for Ballgown Raw Counts data (-B/-b option)
277 GPVec<RC_TData> guides_RC_tdata(true); //raw count data or other info for all guide transcripts
278 GPVec<RC_Feature> guides_RC_exons(true); //raw count data for all guide exons
279 GPVec<RC_Feature> guides_RC_introns(true);//raw count data for all guide introns
280
281 GVec<int> alncounts(30,0); //keep track of the number of read alignments per chromosome [gseq_id]
282
283 int bamcount=bamreader.start(); //setup and open input files
284 #ifndef GFF_DEBUG
285 if (bamcount<1) {
286 GError("%sError: no input files provided!\n",USAGE);
287 }
288 #endif
289
290 #ifdef DEBUGPRINT
291 verbose=true;
292 #endif
293
294 const char* ERR_BAM_SORT="\nError: the input alignment file is not sorted!\n";
295
296 if(guided) { // read guiding transcripts from input gff file
297 if (verbose) {
298 printTime(stderr);
299 GMessage(" Loading reference annotation (guides)..\n");
300 }
301 FILE* f=fopen(guidegff.chars(),"r");
302 if (f==NULL) GError("Error: could not open reference annotation file (%s)!\n",
303 guidegff.chars());
304 // transcripts_only sort by location?
305 GffReader gffr(f, true, true); //loading only recognizable transcript features
306 gffr.setRefAlphaSorted(); //alphabetical sorting of refseq IDs
307 gffr.showWarnings(verbose);
308 // keepAttrs mergeCloseExons noExonAttrs
309 gffr.readAll(false, true, true);
310 //the list of GffObj is in gffr.gflst, sorted by chromosome and start-end coordinates
311 //collect them in other data structures, if it's kept for later call gffobj->isUsed(true)
312 // (otherwise it'll be deallocated when gffr is destroyed due to going out of scope)
313 refseqCount=gffr.gseqtable.Count();
314 if (refseqCount==0 || gffr.gflst.Count()==0) {
315 GError("Error: could not any valid reference transcripts in %s (invalid GTF/GFF file?)\n",
316 guidegff.chars());
317 }
318 refguides.setCount(refseqCount); //maximum gseqid
319 uint c_tid=0;
320 uint c_exon_id=0;
321 uint c_intron_id=0;
322 GList<RC_Feature> uexons(true, false, true); //sorted, free items, unique
323 GList<RC_Feature> uintrons(true, false, true);
324 //assign unique transcript IDs based on the sorted order
325 int last_refid=-1;
326 bool skipGseq=false;
327 for (int i=0;i<gffr.gflst.Count();i++) {
328 GffObj* m=gffr.gflst[i];
329 if (last_refid!=m->gseq_id) {
330 //chromosome switch
331 if (ballgown) { //prepare memory storage/tables for all guides on this chromosome
332 uexons.Clear();
333 uintrons.Clear();
334 }
335 last_refid=m->gseq_id;
336 skipGseq=excludeGseqs.hasKey(m->getGSeqName());
337 }
338 //sanity check: make sure there are no exonless "genes" or other
339 if (skipGseq) continue;
340 if (m->exons.Count()==0) {
341 if (verbose)
342 GMessage("Warning: exonless GFF %s feature with ID %s found, added implicit exon %d-%d.\n",
343 m->getFeatureName(), m->getID(), m->start, m->end);
344 m->addExon(m->start, m->end); //should never happen!
345 }
346 //DONE: always keep a RC_TData pointer around, with additional info about guides
347 RC_TData* tdata=new RC_TData(*m, ++c_tid);
348 m->uptr=tdata;
349 guides_RC_tdata.Add(tdata);
350 if (ballgown) { //already gather exon & intron info for all ref transcripts
351 tdata->rc_addFeatures(c_exon_id, uexons, guides_RC_exons,
352 c_intron_id, uintrons, guides_RC_introns);
353 }
354 GRefData& grefdata = refguides[m->gseq_id];
355 grefdata.add(&gffr, m); //transcripts already sorted by location
356 }
357 if (verbose) {
358 printTime(stderr);
359 GMessage(" %d reference transcripts loaded.\n", gffr.gflst.Count());
360 }
361 }
362
363 #ifdef GFF_DEBUG
364 for (int r=0;r<refguides.Count();++r) {
365 GRefData& grefdata = refguides[r];
366 for (int k=0;k<grefdata.rnas.Count();++k) {
367 GMessage("#transcript #%d : %s (%d exons)\n", k, grefdata.rnas[k]->getID(), grefdata.rnas[k]->exons.Count());
368 grefdata.rnas[k]->printGff(stderr);
369 }
370 }
371 GMessage("GFF Debug mode, exiting...\n");
372 exit(0);
373 #endif
374
375 // --- here we do the input processing
376 gseqNames=GffObj::names; //might have been populated already by gff data
377 gffnames_ref(gseqNames); //initialize the names collection if not guided
378
379
380 GHash<int> hashread; //read_name:pos:hit_index => readlist index
381
382 GList<GffObj>* guides=NULL; //list of transcripts on a specific chromosome
383
384 int currentstart=0, currentend=0;
385 int ng_start=0;
386 int ng_end=-1;
387 int ng=0;
388 GStr lastref;
389 bool no_ref_used=true;
390 int lastref_id=-1; //last seen gseq_id
391 // int ncluster=0; used it for debug purposes only
392
393 //Ballgown files
394 FILE* f_tdata=NULL;
395 FILE* f_edata=NULL;
396 FILE* f_idata=NULL;
397 FILE* f_e2t=NULL;
398 FILE* f_i2t=NULL;
399 if (ballgown)
400 Ballgown_setupFiles(f_tdata, f_edata, f_idata, f_e2t, f_i2t);
401 #ifndef NOTHREADS
402 //model: one producer, multiple consumers
403 #define DEF_TSTACK_SIZE 8388608
404 int tstackSize=GThread::defaultStackSize();
405 size_t defStackSize=0;
406 if (tstackSize<DEF_TSTACK_SIZE) defStackSize=DEF_TSTACK_SIZE;
407 if (verbose) {
408 if (defStackSize>0){
409 int ssize=defStackSize;
410 GMessage("Default stack size for threads: %d (increased to %d)\n", tstackSize, ssize);
411 }
412 else GMessage("Default stack size for threads: %d\n", tstackSize);
413 }
414 GThread* threads=new GThread[num_cpus]; //bundle processing threads
415
416 GPVec<BundleData> bundleQueue(false); //queue of loaded bundles
417 //the consumers take (pop) bundles out of this queue for processing
418 //the producer populates this queue with bundles built from reading the BAM input
419
420 BundleData* bundles=new BundleData[num_cpus+1];
421 //bundles[0..num_cpus-1] are processed by threads, loading bundles[num_cpus] first
422
423 dataClear.setCapacity(num_cpus+1);
424 for (int b=0;b<num_cpus;b++) {
425 threads[b].kickStart(workerThread, (void*) &bundleQueue, defStackSize);
426 bundles[b+1].idx=b+1;
427 dataClear.Push(b);
428 }
429 BundleData* bundle = &(bundles[num_cpus]);
430 #else
431 BundleData bundles[1];
432 BundleData* bundle = &(bundles[0]);
433 #endif
434 GBamRecord* brec=NULL;
435 bool more_alns=true;
436 TAlnInfo* tinfo=NULL; // for --merge
437 int prev_pos=0;
438 bool skipGseq=false;
439 while (more_alns) {
440 bool chr_changed=false;
441 int pos=0;
442 const char* refseqName=NULL;
443 char xstrand=0;
444 int nh=1;
445 int hi=0;
446 int gseq_id=lastref_id; //current chr id
447 bool new_bundle=false;
448 //delete brec;
449 if ((brec=bamreader.next())!=NULL) {
450 if (brec->isUnmapped()) continue;
451 if (brec->start<1 || brec->mapped_len<10) {
452 if (verbose) GMessage("Warning: invalid mapping found for read %s (position=%d, mapped length=%d)\n",
453 brec->name(), brec->start, brec->mapped_len);
454 continue;
455 }
456
457 refseqName=brec->refName();
458 xstrand=brec->spliceStrand(); // tagged strand gets priority
459 if(xstrand=='.' && (fr_strand || rf_strand)) { // set strand if stranded library
460 if(brec->isPaired()) { // read is paired
461 if(brec->pairOrder()==1) { // first read in pair
462 if((rf_strand && brec->revStrand())||(fr_strand && !brec->revStrand())) xstrand='+';
463 else xstrand='-';
464 }
465 else {
466 if((rf_strand && brec->revStrand())||(fr_strand && !brec->revStrand())) xstrand='-';
467 else xstrand='+';
468 }
469 }
470 else {
471 if((rf_strand && brec->revStrand())||(fr_strand && !brec->revStrand())) xstrand='+';
472 else xstrand='-';
473 }
474 }
475
476 /*
477 if (xstrand=='.' && brec->exons.Count()>1) {
478 no_xs++;
479 continue; //skip spliced alignments lacking XS tag (e.g. HISAT alignments)
480 }
481 // I might still infer strand later */
482
483 if (refseqName==NULL) GError("Error: cannot retrieve target seq name from BAM record!\n");
484 pos=brec->start; //BAM is 0 based, but GBamRecord makes it 1-based
485 chr_changed=(lastref.is_empty() || lastref!=refseqName);
486 if (chr_changed) {
487 skipGseq=excludeGseqs.hasKey(refseqName);
488 gseq_id=gseqNames->gseqs.addName(refseqName);
489 if (guided) {
490 if (gseq_id>=refseqCount) {
491 if (verbose)
492 GMessage("WARNING: no reference transcripts found for genomic sequence \"%s\"! (mismatched reference names?)\n",
493 refseqName);
494 }
495 else no_ref_used=false;
496 }
497
498 if (alncounts.Count()<=gseq_id) {
499 alncounts.Resize(gseq_id+1, 0);
500 }
501 else if (alncounts[gseq_id]>0) GError(ERR_BAM_SORT);
502 prev_pos=0;
503 }
504 if (pos<prev_pos) GError(ERR_BAM_SORT);
505 prev_pos=pos;
506 if (skipGseq) continue;
507 alncounts[gseq_id]++;
508 nh=brec->tag_int("NH");
509 if (nh==0) nh=1;
510 hi=brec->tag_int("HI");
511 if (mergeMode) {
512 tinfo=new TAlnInfo(brec->name(), brec->tag_int("ZF"));
513 GStr score(brec->tag_str("ZS"));
514 if (!score.is_empty()) {
515 GStr srest=score.split('|');
516 if (!score.is_empty())
517 tinfo->cov=score.asDouble();
518 score=srest.split('|');
519 if (!srest.is_empty())
520 tinfo->fpkm=srest.asDouble();
521 srest=score.split('|');
522 if (!score.is_empty())
523 tinfo->tpm=score.asDouble();
524 }
525 }
526
527 if (!chr_changed && currentend>0 && pos>currentend+(int)runoffdist) {
528 new_bundle=true;
529 }
530 }
531 else { //no more alignments
532 more_alns=false;
533 new_bundle=true; //fake a new start (end of last bundle)
534 }
535
536 if (new_bundle || chr_changed) {
537 //bgeneids.Clear();
538 hashread.Clear();
539 if (bundle->readlist.Count()>0) { // process reads in previous bundle
540 // (readthr, junctionthr, mintranscriptlen are globals)
541 bundle->getReady(currentstart, currentend);
542 if (gfasta!=NULL) { //genomic sequence data requested
543 GFaSeqGet* faseq=gfasta->fetch(bundle->refseq.chars());
544 if (faseq==NULL) {
545 GError("Error: could not retrieve sequence data for %s!\n", bundle->refseq.chars());
546 }
547 bundle->gseq=faseq->copyRange(bundle->start, bundle->end, false, true);
548 }
549 #ifndef NOTHREADS
550 //push this in the bundle queue where it'll be picked up by the threads
551 DBGPRINT2("##> Locking queueMutex to push loaded bundle into the queue (bundle.start=%d)\n", bundle->start);
552 int qCount=0;
553 queueMutex.lock();
554 bundleQueue.Push(bundle);
555 bundleWork |= 0x02; //set bit 1
556 qCount=bundleQueue.Count();
557 queueMutex.unlock();
558 DBGPRINT2("##> bundleQueue.Count()=%d)\n", qCount);
559 //wait for a thread to pop this bundle from the queue
560 waitMutex.lock();
561 DBGPRINT("##> waiting for a thread to become available..\n");
562 while (threadsWaiting==0) {
563 haveThreads.wait(waitMutex);
564 }
565 waitMutex.unlock();
566 haveBundles.notify_one();
567 DBGPRINT("##> waitMutex unlocked, haveBundles notified, current thread yielding\n");
568 current_thread::yield();
569 queueMutex.lock();
570 DBGPRINT("##> queueMutex locked until bundleQueue.Count()==qCount\n");
571 while (bundleQueue.Count()==qCount) {
572 queueMutex.unlock();
573 DBGPRINT2("##> queueMutex unlocked as bundleQueue.Count()==%d\n", qCount);
574 haveBundles.notify_one();
575 current_thread::yield();
576 queueMutex.lock();
577 DBGPRINT("##> queueMutex locked again within while loop\n");
578 }
579 queueMutex.unlock();
580
581 #else //no threads
582 //Num_Fragments+=bundle->num_fragments;
583 //Frag_Len+=bundle->frag_len;
584 processBundle(bundle);
585 #endif
586 // ncluster++; used it for debug purposes only
587 } //have alignments to process
588 else { //no read alignments in this bundle?
589 #ifndef NOTHREADS
590 dataMutex.lock();
591 DBGPRINT2("##> dataMutex locked for bundle #%d clearing..\n", bundle->idx);
592 #endif
593 bundle->Clear();
594 #ifndef NOTHREADS
595 dataClear.Push(bundle->idx);
596 DBGPRINT2("##> dataMutex unlocking as dataClear got pushed idx #%d\n", bundle->idx);
597 dataMutex.unlock();
598 #endif
599 } //nothing to do with this bundle
600
601 if (chr_changed) {
602 if (guided) {
603 ng=0;
604 guides=NULL;
605 ng_start=0;
606 ng_end=-1;
607 if (refguides.Count()>gseq_id && refguides[gseq_id].rnas.Count()>0) {
608 guides=&(refguides[gseq_id].rnas);
609 ng=guides->Count();
610 }
611 }
612 lastref=refseqName;
613 lastref_id=gseq_id;
614 currentend=0;
615 }
616
617 if (!more_alns) {
618 if (verbose) {
619 #ifndef NOTHREADS
620 GLockGuard<GFastMutex> lock(logMutex);
621 #endif
622 if (Num_Fragments) {
623 printTime(stderr);
624 GMessage(" %g aligned fragments found.\n", Num_Fragments);
625 }
626 //GMessage(" Done reading alignments.\n");
627 }
628 noMoreBundles();
629 break;
630 }
631 #ifndef NOTHREADS
632
633 int new_bidx=waitForData(bundles);
634 if (new_bidx<0) {
635 //should never happen!
636 GError("Error: waitForData() returned invalid bundle index(%d)!\n",new_bidx);
637 break;
638 }
639 bundle=&(bundles[new_bidx]);
640 #endif
641 currentstart=pos;
642 currentend=brec->end;
643 if (guides) { //guided and guides!=NULL
644 ng_start=ng_end+1;
645 while (ng_start<ng && (int)(*guides)[ng_start]->end < pos) {
646 // for now, skip guides which have no overlap with current read
647 ng_start++;
648 }
649 int ng_ovl=ng_start;
650 //add all guides overlapping the current read and other guides that overlap them
651 while (ng_ovl<ng && (int)(*guides)[ng_ovl]->start<=currentend) { //while guide overlap
652 if (currentstart>(int)(*guides)[ng_ovl]->start)
653 currentstart=(*guides)[ng_ovl]->start;
654 if (currentend<(int)(*guides)[ng_ovl]->end)
655 currentend=(*guides)[ng_ovl]->end;
656 if (ng_ovl==ng_start && ng_ovl>0) { //first time only, we have to check back all possible transitive guide overlaps
657 //char* geneid=(*guides)[ng_ovlstart]->getGeneID();
658 //if (geneid==NULL) geneid=(*guides)[ng_ovlstart]->getGeneName();
659 //if (geneid && !bgeneids.hasKey(geneid))
660 // bgeneids.shkAdd(geneid, &ng); //whatever pointer to int
661 int g_back=ng_ovl; //start from the overlapping guide, going backwards
662 int g_ovl_start=ng_ovl;
663 while (g_back>ng_end+1) {
664 --g_back;
665 //if overlap, set g_back_start=g_back and update currentstart
666 if (currentstart<=(int)(*guides)[g_back]->end) {
667 g_ovl_start=g_back;
668 if (currentstart>(int)(*guides)[g_back]->start)
669 currentstart=(int)(*guides)[g_back]->start;
670 }
671 } //while checking previous guides that could be pulled in this bundle
672 for (int gb=g_ovl_start;gb<=ng_ovl;++gb) {
673 bundle->keepGuide((*guides)[gb],
674 &guides_RC_tdata, &guides_RC_exons, &guides_RC_introns);
675 }
676 } //needed to check previous guides for overlaps
677 else
678 bundle->keepGuide((*guides)[ng_ovl],
679 &guides_RC_tdata, &guides_RC_exons, &guides_RC_introns);
680 ng_ovl++;
681 } //while guide overlap
682 ng_end=ng_ovl-1; //MUST update ng_end here, even if no overlaps were found
683 } //guides present on the current chromosome
684 bundle->refseq=lastref;
685 bundle->start=currentstart;
686 bundle->end=currentend;
687 } //<---- new bundle started
688
689 if (currentend<(int)brec->end) {
690 //current read extends the bundle
691 //this might not happen if a longer guide had already been added to the bundle
692 currentend=brec->end;
693 if (guides) { //add any newly overlapping guides to bundle
694 bool cend_changed;
695 do {
696 cend_changed=false;
697 while (ng_end+1<ng && (int)(*guides)[ng_end+1]->start<=currentend) {
698 ++ng_end;
699 //more transcripts overlapping this bundle?
700 if ((int)(*guides)[ng_end]->end>=currentstart) {
701 //it should really overlap the bundle
702 bundle->keepGuide((*guides)[ng_end],
703 &guides_RC_tdata, &guides_RC_exons, &guides_RC_introns);
704 if(currentend<(int)(*guides)[ng_end]->end) {
705 currentend=(*guides)[ng_end]->end;
706 cend_changed=true;
707 }
708 }
709 }
710 } while (cend_changed);
711 }
712 } //adjusted currentend and checked for overlapping reference transcripts
713 GReadAlnData alndata(brec, 0, nh, hi, tinfo);
714 bool ovlpguide=bundle->evalReadAln(alndata, xstrand);
715 if(!eonly || ovlpguide) { // in eonly case consider read only if it overlaps guide
716 //check for overlaps with ref transcripts which may set xstrand
717 if (xstrand=='+') alndata.strand=1;
718 else if (xstrand=='-') alndata.strand=-1;
719 //GMessage("%s\t%c\t%d\thi=%d\n",brec->name(), xstrand, alndata.strand,hi);
720 //countFragment(*bundle, *brec, hi,nh); // we count this in build_graphs to only include mapped fragments that we consider correctly mapped
721 //fprintf(stderr,"fragno=%d fraglen=%lu\n",bundle->num_fragments,bundle->frag_len);if(bundle->num_fragments==100) exit(0);
722 //if (!ballgown || ref_overlap)
723 processRead(currentstart, currentend, *bundle, hashread, alndata);
724 // *brec, strand, nh, hi);
725 }
726 } //for each read alignment
727
728 //cleaning up
729 delete brec;
730 //bamreader.bclose();
731 bamreader.stop(); //close all BAM files
732
733 if (guided && no_ref_used) {
734 GMessage("WARNING: no reference transcripts were found for the genomic sequences where reads were mapped!\n"
735 "Please make sure the -G annotation file uses the same naming convention for the genome sequences.\n");
736 }
737
738 delete gfasta;
739
740 #ifndef NOTHREADS
741 for (int t=0;t<num_cpus;t++)
742 threads[t].join();
743 if (verbose) {
744 printTime(stderr);
745 GMessage(" All threads finished.\n");
746 }
747 delete[] threads;
748 delete[] bundles;
749 #else
750 if (verbose) {
751 printTime(stderr);
752 GMessage(" Done.\n");
753 }
754 #endif
755
756 #ifdef B_DEBUG
757 fclose(dbg_out);
758 #endif
759 if (mergeMode && guided )
760 writeUnbundledGuides(refguides, f_out);
761
762 fclose(f_out);
763 if (c_out && c_out!=stdout) fclose(c_out);
764
765 if(verbose && no_xs>0)
766 GMessage("Number spliced alignments missing the XS tag (skipped): %d\n",no_xs);
767
768 if(!mergeMode) {
769 if(verbose) {
770 GMessage("Total count of aligned fragments: %g\n", Num_Fragments);
771 if (Num_Fragments)
772 GMessage("Fragment coverage length: %g\n", Frag_Len/Num_Fragments);
773 }
774
775 f_out=stdout;
776 if(outfname!="stdout") {
777 f_out=fopen(outfname.chars(), "w");
778 if (f_out==NULL) GError("Error creating output file %s\n", outfname.chars());
779 }
780
781 fprintf(f_out,"# ");
782 args.printCmdLine(f_out);
783 fprintf(f_out,"# StringTie version %s\n",VERSION);
784
785 //fprintf(stderr,"cov_sum=%g frag_len=%g num_frag=%g\n",Cov_Sum,Frag_Len,Num_Fragments);
786
787 FILE *g_out=NULL;
788 if(geneabundance) {
789 g_out=fopen(genefname.chars(),"w");
790 if (g_out==NULL)
791 GError("Error creating gene abundance output file %s\n", genefname.chars());
792 fprintf(g_out,"Gene ID\tGene Name\tReference\tStrand\tStart\tEnd\tCoverage\tFPKM\tTPM\n");
793 }
794
795 FILE* ftmp_in=fopen(tmpfname.chars(),"rt");
796 if (ftmp_in!=NULL) {
797 char* linebuf=NULL;
798 int linebuflen=5000;
799 GMALLOC(linebuf, linebuflen);
800 int nl;
801 int istr;
802 int tlen;
803 float tcov; //do we need to increase precision here ? (double)
804 float calc_fpkm;
805 float calc_tpm;
806 int t_id;
807 while(fgetline(linebuf,linebuflen,ftmp_in)) {
808 sscanf(linebuf,"%d %d %d %d %g", &istr, &nl, &tlen, &t_id, &tcov);
809 if (tcov<0) tcov=0;
810 if (Frag_Len>0.001) calc_fpkm=tcov*1000000000/Frag_Len;
811 else calc_fpkm=0.0;
812 if (Cov_Sum>0.00001) calc_tpm=tcov*1000000/Cov_Sum;
813 else calc_tpm=0.0;
814 if(istr) { // this is a transcript
815 if (ballgown && t_id>0) {
816 guides_RC_tdata[t_id-1]->fpkm=calc_fpkm;
817 guides_RC_tdata[t_id-1]->cov=tcov;
818 }
819 for(int i=0;i<nl;i++) {
820 fgetline(linebuf,linebuflen,ftmp_in);
821 if(!i) {
822 //linebuf[strlen(line)-1]='\0';
823 fprintf(f_out,"%s",linebuf);
824 fprintf(f_out," FPKM \"%.6f\";",calc_fpkm);
825 fprintf(f_out," TPM \"%.6f\";",calc_tpm);
826 fprintf(f_out,"\n");
827 }
828 else fprintf(f_out,"%s\n",linebuf);
829 }
830 }
831 else { // this is a gene -> different file pointer
832 fgetline(linebuf, linebuflen, ftmp_in);
833 fprintf(g_out, "%s\t%.6f\t%.6f\n", linebuf, calc_fpkm, calc_tpm);
834 }
835 }
836 if (guided) {
837 writeUnbundledGuides(refguides, f_out, g_out);
838 }
839 fclose(f_out);
840 fclose(ftmp_in);
841 if(geneabundance) fclose(g_out);
842 GFREE(linebuf);
843 if (!keepTempFiles) {
844 remove(tmpfname.chars());
845 }
846 }
847 else {
848 fclose(f_out);
849 GError("No temporary file %s present!\n",tmpfname.chars());
850 }
851
852 //lastly, for ballgown, rewrite the tdata file with updated cov and fpkm
853 if (ballgown) {
854 rc_writeRC(guides_RC_tdata, guides_RC_exons, guides_RC_introns,
855 f_tdata, f_edata, f_idata, f_e2t, f_i2t);
856 }
857 }
858
859 if (!keepTempFiles) {
860 tmp_path.chomp('/');
861 remove(tmp_path);
862 }
863
864
865 gffnames_unref(gseqNames); //deallocate names collection
866
867
868 #ifdef GMEMTRACE
869 if(verbose) GMessage(" Max bundle memory: %6.1fMB for bundle %s\n", maxMemRS/1024, maxMemBundle.chars());
870 #endif
871 } // -- END main
872
873 //----------------------------------------
sprintTime()874 char* sprintTime() {
875 static char sbuf[32];
876 time_t ltime; /* calendar time */
877 ltime=time(NULL);
878 struct tm *t=localtime(<ime);
879 sprintf(sbuf, "%02d_%02d_%02d:%02d:%02d",t->tm_mon+1, t->tm_mday,
880 t->tm_hour, t->tm_min, t->tm_sec);
881 return(sbuf);
882 }
883
processOptions(GArgs & args)884 void processOptions(GArgs& args) {
885
886
887 if (args.getOpt('h') || args.getOpt("help")) {
888 fprintf(stdout,"%s",USAGE);
889 exit(0);
890 }
891 if (args.getOpt("version")) {
892 fprintf(stdout,"%s\n",VERSION);
893 exit(0);
894 }
895
896
897 longreads=(args.getOpt('L')!=NULL);
898 if(longreads) {
899 bundledist=0;
900 singlethr=1.5;
901 }
902
903
904 if (args.getOpt("conservative")) {
905 isofrac=0.05;
906 singlethr=4.75;
907 readthr=1.5;
908 trim=false;
909 }
910
911 if (args.getOpt('t')) {
912 trim=false;
913 }
914
915 if (args.getOpt("fr")) {
916 fr_strand=true;
917 }
918 if (args.getOpt("rf")) {
919 rf_strand=true;
920 if(fr_strand) GError("Error: --fr and --rf options are incompatible.\n");
921 }
922
923 debugMode=(args.getOpt("debug")!=NULL || args.getOpt('D')!=NULL);
924 forceBAM=(args.getOpt("bam")!=NULL); //assume the stdin stream is BAM instead of text SAM
925 mergeMode=(args.getOpt("merge")!=NULL);
926 keepTempFiles=(args.getOpt("keeptmp")!=NULL);
927 //adaptive=!(args.getOpt('d')!=NULL);
928 verbose=(args.getOpt('v')!=NULL);
929 if (verbose) {
930 fprintf(stderr, "Running StringTie " VERSION ". Command line:\n");
931 args.printCmdLine(stderr);
932 }
933 //complete=!(args.getOpt('i')!=NULL);
934 // trim=!(args.getOpt('t')!=NULL);
935 includesource=!(args.getOpt('z')!=NULL);
936 //EM=(args.getOpt('y')!=NULL);
937 //weight=(args.getOpt('w')!=NULL);
938
939 GStr s=args.getOpt('m');
940 if (!s.is_empty()) {
941 mintranscriptlen=s.asInt();
942 if (!mergeMode) {
943 if (mintranscriptlen<30)
944 GError("Error: invalid -m value, must be >=30)\n");
945 }
946 else if (mintranscriptlen<0) GError("Error: invalid -m value, must be >=0)\n");
947 }
948 else if(mergeMode) mintranscriptlen=50;
949
950 /*
951 if (args.getOpt('S')) {
952 // sensitivitylevel=2; no longer supported from version 1.0.3
953 sensitivitylevel=1;
954 }
955 */
956
957 s=args.getOpt("rseq");
958 if (s.is_empty())
959 s=args.getOpt('S');
960 if (!s.is_empty()) {
961 gfasta=new GFastaDb(s.chars());
962 }
963
964 s=args.getOpt('x');
965 if (!s.is_empty()) {
966 //split by comma and populate excludeGSeqs
967 s.startTokenize(" ,\t");
968 GStr chrname;
969 while (s.nextToken(chrname)) {
970 excludeGseqs.Add(chrname.chars(),new int(0));
971 }
972 }
973
974 /*
975 s=args.getOpt('n');
976 if (!s.is_empty()) {
977 sensitivitylevel=s.asInt();
978 if(sensitivitylevel<0) {
979 sensitivitylevel=0;
980 GMessage("sensitivity level out of range: setting sensitivity level at 0\n");
981 }
982 if(sensitivitylevel>3) {
983 sensitivitylevel=3;
984 GMessage("sensitivity level out of range: setting sensitivity level at 2\n");
985 }
986 }
987 */
988
989
990 s=args.getOpt('p');
991 if (!s.is_empty()) {
992 num_cpus=s.asInt();
993 if (num_cpus<=0) num_cpus=1;
994 }
995
996 s=args.getOpt('a');
997 if (!s.is_empty()) {
998 junctionsupport=(uint)s.asInt();
999 }
1000
1001 s=args.getOpt('j');
1002 if (!s.is_empty()) junctionthr=s.asInt();
1003
1004
1005 rawreads=(args.getOpt('R')!=NULL);
1006 if(rawreads) {
1007 if(!longreads) {
1008 if(verbose) GMessage("Enable longreads processing\n");
1009 longreads=true;
1010 bundledist=0;
1011 }
1012 readthr=0;
1013 }
1014
1015 s=args.getOpt('c');
1016 if (!s.is_empty()) {
1017 readthr=(float)s.asDouble();
1018 if (readthr<0.001 && !mergeMode) {
1019 GError("Error: invalid -c value, must be >=0.001)\n");
1020 }
1021 }
1022 else if(mergeMode) readthr=0;
1023
1024
1025 s=args.getOpt('g');
1026 if (!s.is_empty()) {
1027 bundledist=s.asInt();
1028 if(bundledist>runoffdist) runoffdist=bundledist;
1029 }
1030 else if(mergeMode) bundledist=250; // should figure out here a reasonable parameter for merge
1031
1032 s=args.getOpt('F');
1033 if (!s.is_empty()) {
1034 fpkm_thr=(float)s.asDouble();
1035 }
1036 //else if(mergeMode) fpkm_thr=0;
1037
1038 s=args.getOpt('T');
1039 if (!s.is_empty()) {
1040 tpm_thr=(float)s.asDouble();
1041 }
1042 //else if(mergeMode) tpm_thr=0;
1043
1044 s=args.getOpt('l');
1045 if (!s.is_empty()) label=s;
1046 else if(mergeMode) label="MSTRG";
1047
1048 s=args.getOpt('f');
1049 if (!s.is_empty()) {
1050 isofrac=(float)s.asDouble();
1051 if(isofrac>=1) GError("Miminum isoform fraction (-f coefficient: %f) needs to be less than 1\n",isofrac);
1052 }
1053 else if(mergeMode) isofrac=0.01;
1054 s=args.getOpt('M');
1055 if (!s.is_empty()) {
1056 mcov=(float)s.asDouble();
1057 }
1058
1059 genefname=args.getOpt('A');
1060 if(!genefname.is_empty()) {
1061 geneabundance=true;
1062 }
1063
1064 tmpfname=args.getOpt('o');
1065
1066 // coverage saturation no longer used after version 1.0.4; left here for compatibility with previous versions
1067 s=args.getOpt('s');
1068 if (!s.is_empty()) {
1069 singlethr=(float)s.asDouble();
1070 if (readthr<0.001 && !mergeMode) {
1071 GError("Error: invalid -s value, must be >=0.001)\n");
1072 }
1073 }
1074
1075 if (args.getOpt('G')) {
1076 guidegff=args.getOpt('G');
1077 if (fileExists(guidegff.chars())>1)
1078 guided=true;
1079 else GError("Error: reference annotation file (%s) not found.\n",
1080 guidegff.chars());
1081 }
1082
1083 enableNames=(args.getOpt('E')!=NULL);
1084
1085 retained_intron=(args.getOpt('i')!=NULL);
1086
1087 nomulti=(args.getOpt('u')!=NULL);
1088
1089 //isunitig=(args.getOpt('U')!=NULL);
1090
1091 eonly=(args.getOpt('e')!=NULL);
1092 if(eonly && rawreads) {
1093 if(verbose) GMessage("Error: can not use -e and -R at the same time; parameter -e will be ignored\n");
1094 }
1095 else if(eonly && mergeMode) {
1096 eonly=false;
1097 includecov=true;
1098 }
1099 else if(eonly && !guided)
1100 GError("Error: invalid -e usage, GFF reference not given (-G option required).\n");
1101
1102
1103 ballgown_dir=args.getOpt('b');
1104 ballgown=(args.getOpt('B')!=NULL);
1105 if (ballgown && !ballgown_dir.is_empty()) {
1106 GError("Error: please use either -B or -b <path> options, not both.");
1107 }
1108 if ((ballgown || !ballgown_dir.is_empty()) && !guided)
1109 GError("Error: invalid -B/-b usage, GFF reference not given (-G option required).\n");
1110
1111 /* s=args->getOpt('P');
1112 if (!s.is_empty()) {
1113 if(!guided) GError("Error: option -G with reference annotation file has to be specified.\n");
1114 c_out=fopen(s.chars(), "w");
1115 if (c_out==NULL) GError("Error creating output file %s\n", s.chars());
1116 partialcov=true;
1117 }
1118 else { */
1119 s=args.getOpt('C');
1120 if (!s.is_empty()) {
1121 if(!guided) GError("Error: invalid -C usage, GFF reference not given (-G option required).\n");
1122 c_out=fopen(s.chars(), "w");
1123 if (c_out==NULL) GError("Error creating output file %s\n", s.chars());
1124 }
1125 //}
1126 int numbam=args.startNonOpt();
1127 #ifndef GFF_DEBUG
1128 if (numbam < 1 ) {
1129 GMessage("%s\nError: no input file provided!\n",USAGE);
1130 exit(1);
1131 }
1132 #endif
1133 const char* ifn=NULL;
1134 while ( (ifn=args.nextNonOpt())!=NULL) {
1135 //input alignment files
1136 bamreader.Add(ifn);
1137 }
1138 //deferred creation of output path
1139 outfname="stdout";
1140 out_dir="./";
1141 if (!tmpfname.is_empty() && tmpfname!="-") {
1142 if (tmpfname[0]=='.' && tmpfname[1]=='/')
1143 tmpfname.cut(0,2);
1144 outfname=tmpfname;
1145 int pidx=outfname.rindex('/');
1146 if (pidx>=0) {//path given
1147 out_dir=outfname.substr(0,pidx+1);
1148 tmpfname=outfname.substr(pidx+1);
1149 }
1150 }
1151 else { // stdout
1152 tmpfname=outfname;
1153 char *stime=sprintTime();
1154 tmpfname.tr(":","-");
1155 tmpfname+='.';
1156 tmpfname+=stime;
1157 }
1158 if (out_dir!="./") {
1159 if (fileExists(out_dir.chars())==0) {
1160 //directory does not exist, create it
1161 if (Gmkdir(out_dir.chars()) && !fileExists(out_dir.chars())) {
1162 GError("Error: cannot create directory %s!\n", out_dir.chars());
1163 }
1164 }
1165 }
1166 if (!genefname.is_empty()) {
1167 if (genefname[0]=='.' && genefname[1]=='/')
1168 genefname.cut(0,2);
1169 //attempt to create the gene abundance path
1170 GStr genefdir("./");
1171 int pidx=genefname.rindex('/');
1172 if (pidx>=0) { //get the path part
1173 genefdir=genefname.substr(0,pidx+1);
1174 //genefname=genefname.substr(pidx+1);
1175 }
1176 if (genefdir!="./") {
1177 if (fileExists(genefdir.chars())==0) {
1178 //directory does not exist, create it
1179 if (Gmkdir(genefdir.chars()) || !fileExists(genefdir.chars())) {
1180 GError("Error: cannot create directory %s!\n", genefdir.chars());
1181 }
1182 }
1183 }
1184
1185 }
1186
1187 { //prepare temp path
1188 GStr stempl(out_dir);
1189 stempl.chomp('/');
1190 stempl+="/tmp.XXXXXXXX";
1191 char* ctempl=Gstrdup(stempl.chars());
1192 Gmktempdir(ctempl);
1193 tmp_path=ctempl;
1194 tmp_path+='/';
1195 GFREE(ctempl);
1196 }
1197
1198 tmpfname=tmp_path+tmpfname;
1199
1200 if (ballgown) ballgown_dir=out_dir;
1201 else if (!ballgown_dir.is_empty()) {
1202 ballgown=true;
1203 ballgown_dir.chomp('/');ballgown_dir+='/';
1204 if (fileExists(ballgown_dir.chars())==0) {
1205 //directory does not exist, create it
1206 if (Gmkdir(ballgown_dir.chars()) && !fileExists(ballgown_dir.chars())) {
1207 GError("Error: cannot create directory %s!\n", ballgown_dir.chars());
1208 }
1209
1210 }
1211 }
1212 #ifdef B_DEBUG
1213 GStr dbgfname(tmpfname);
1214 dbgfname+=".dbg";
1215 dbg_out=fopen(dbgfname.chars(), "w");
1216 if (dbg_out==NULL) GError("Error creating debug output file %s\n", dbgfname.chars());
1217 #endif
1218
1219 if(mergeMode) {
1220 f_out=stdout;
1221 if(outfname!="stdout") {
1222 f_out=fopen(outfname.chars(), "w");
1223 if (f_out==NULL) GError("Error creating output file %s\n", outfname.chars());
1224 }
1225 fprintf(f_out,"# ");
1226 args.printCmdLine(f_out);
1227 fprintf(f_out,"# StringTie version %s\n",VERSION);
1228 }
1229 else {
1230 tmpfname+=".tmp";
1231 f_out=fopen(tmpfname.chars(), "w");
1232 if (f_out==NULL) GError("Error creating output file %s\n", tmpfname.chars());
1233 }
1234 }
1235
1236 //---------------
moreBundles()1237 bool moreBundles() { //getter (interogation)
1238 bool v=true;
1239 #ifndef NOTHREADS
1240 GLockGuard<GFastMutex> lock(bamReadingMutex);
1241 #endif
1242 v = ! NoMoreBundles;
1243 return v;
1244 }
1245
noMoreBundles()1246 void noMoreBundles() {
1247 #ifndef NOTHREADS
1248 bamReadingMutex.lock();
1249 NoMoreBundles=true;
1250 bamReadingMutex.unlock();
1251 queueMutex.lock();
1252 bundleWork &= ~(int)0x01; //clear bit 0;
1253 queueMutex.unlock();
1254 bool areThreadsWaiting=true;
1255 do {
1256 waitMutex.lock();
1257 areThreadsWaiting=(threadsWaiting>0);
1258 waitMutex.unlock();
1259 if (areThreadsWaiting) {
1260 DBGPRINT("##> NOTIFY ALL workers: no more data!\n");
1261 haveBundles.notify_all();
1262 current_thread::sleep_for(10);
1263 waitMutex.lock();
1264 areThreadsWaiting=(threadsWaiting>0);
1265 waitMutex.unlock();
1266 current_thread::sleep_for(10);
1267 }
1268 } while (areThreadsWaiting); //paranoid check that all threads stopped waiting
1269 #else
1270 NoMoreBundles=true;
1271 #endif
1272 }
1273
processBundle(BundleData * bundle)1274 void processBundle(BundleData* bundle) {
1275 if (verbose) {
1276 #ifndef NOTHREADS
1277 GLockGuard<GFastMutex> lock(logMutex);
1278 #endif
1279 printTime(stderr);
1280 GMessage(">bundle %s:%d-%d [%lu alignments (%d distinct), %d junctions, %d guides] begins processing...\n",
1281 bundle->refseq.chars(), bundle->start, bundle->end, bundle->numreads, bundle->readlist.Count(), bundle->junction.Count(),
1282 bundle->keepguides.Count());
1283 #ifdef GMEMTRACE
1284 double vm,rsm;
1285 get_mem_usage(vm, rsm);
1286 GMessage("\t\tstart memory usage: %6.1fMB\n",rsm/1024);
1287 if (rsm>maxMemRS) {
1288 maxMemRS=rsm;
1289 maxMemVM=vm;
1290 maxMemBundle.format("%s:%d-%d(%d)", bundle->refseq.chars(), bundle->start, bundle->end, bundle->readlist.Count());
1291 }
1292 #endif
1293 }
1294 #ifdef B_DEBUG
1295 for (int i=0;i<bundle->keepguides.Count();++i) {
1296 GffObj& t=*(bundle->keepguides[i]);
1297 RC_TData* tdata=(RC_TData*)(t.uptr);
1298 fprintf(dbg_out, ">%s (t_id=%d) %s%c %d %d\n", t.getID(), tdata->t_id, t.getGSeqName(), t.strand, t.start, t.end );
1299 for (int fe=0;fe < tdata->t_exons.Count(); ++fe) {
1300 RC_Feature& exoninfo = *(tdata->t_exons[fe]);
1301 fprintf(dbg_out, "%d\texon\t%d\t%d\t%c\t%d\t%d\n", exoninfo.id, exoninfo.l, exoninfo.r,
1302 exoninfo.strand, exoninfo.rcount, exoninfo.ucount);
1303 if (! (exoninfo==*(bundle->rc_data->guides_RC_exons->Get(exoninfo.id-1))))
1304 GError("exoninfo with id (%d) not matching!\n", exoninfo.id);
1305 }
1306 for (int fi=0;fi < tdata->t_introns.Count(); ++fi) {
1307 RC_Feature& introninfo = *(tdata->t_introns[fi]);
1308 fprintf(dbg_out, "%d\tintron\t%d\t%d\t%c\t%d\t%d\n", introninfo.id, introninfo.l, introninfo.r,
1309 introninfo.strand, introninfo.rcount, introninfo.ucount);
1310 if (! (introninfo==*(bundle->rc_data->guides_RC_introns->Get(introninfo.id-1))))
1311 GError("introninfo with id (%d) not matching!\n", introninfo.id);
1312 }
1313 //check that IDs are properly assigned
1314 if (tdata->t_id!=(uint)t.udata) GError("tdata->t_id(%d) not matching t.udata(%d)!\n",tdata->t_id, t.udata);
1315 if (tdata->t_id!=bundle->rc_data->guides_RC_tdata->Get(tdata->t_id-1)->t_id)
1316 GError("tdata->t_id(%d) not matching rc_data[t_id-1]->t_id (%d)\n", tdata->t_id, bundle->rc_data->g_tdata[tdata->t_id-1]->t_id);
1317
1318 }
1319 #endif
1320 infer_transcripts(bundle);
1321
1322 if (ballgown && bundle->rc_data) {
1323 rc_update_exons(*(bundle->rc_data));
1324 }
1325 if (bundle->pred.Count()>0 || ((eonly || geneabundance) && bundle->keepguides.Count()>0)) {
1326 #ifndef NOTHREADS
1327 GLockGuard<GFastMutex> lock(printMutex);
1328 #endif
1329 if(mergeMode) GeneNo=printMergeResults(bundle, GeneNo,bundle->refseq);
1330 else GeneNo=printResults(bundle, GeneNo, bundle->refseq);
1331 }
1332
1333 if (bundle->num_fragments) {
1334 #ifndef NOTHREADS
1335 GLockGuard<GFastMutex> lock(countMutex);
1336 #endif
1337 Num_Fragments+=bundle->num_fragments;
1338 Frag_Len+=bundle->frag_len;
1339 Cov_Sum+=bundle->sum_cov;
1340 }
1341
1342 if (verbose) {
1343 #ifndef NOTHREADS
1344 GLockGuard<GFastMutex> lock(logMutex);
1345 #endif
1346 /*
1347 SumReads+=bundle->sumreads;
1348 SumFrag+=bundle->sumfrag;
1349 NumCov+=bundle->num_cov;
1350 NumReads+=bundle->num_reads;
1351 NumFrag+=bundle->num_frag;
1352 NumFrag3+=bundle->num_fragments3;
1353 SumFrag3+=bundle->sum_fragments3;
1354 fprintf(stderr,"Number of fragments in bundle: %g with length %g\n",bundle->num_fragments,bundle->frag_len);
1355 fprintf(stderr,"Number of fragments in bundle: %g with sum %g\n",bundle->num_fragments,bundle->frag_len);
1356 */
1357 printTime(stderr);
1358 GMessage("^bundle %s:%d-%d done (%d processed potential transcripts).\n",bundle->refseq.chars(),
1359 bundle->start, bundle->end, bundle->pred.Count());
1360 #ifdef GMEMTRACE
1361 double vm,rsm;
1362 get_mem_usage(vm, rsm);
1363 GMessage("\t\tfinal memory usage: %6.1fMB\n",rsm/1024);
1364 if (rsm>maxMemRS) {
1365 maxMemRS=rsm;
1366 maxMemVM=vm;
1367 maxMemBundle.format("%s:%d-%d(%d)", bundle->refseq.chars(), bundle->start, bundle->end, bundle->readlist.Count());
1368 }
1369 #endif
1370 }
1371 bundle->Clear();
1372 }
1373
1374 #ifndef NOTHREADS
1375
noThreadsWaiting()1376 bool noThreadsWaiting() {
1377 waitMutex.lock();
1378 int v=threadsWaiting;
1379 waitMutex.unlock();
1380 return (v<1);
1381 }
1382
workerThread(GThreadData & td)1383 void workerThread(GThreadData& td) {
1384 GPVec<BundleData>* bundleQueue = (GPVec<BundleData>*)td.udata;
1385 //wait for a ready bundle in the queue, until there is no hope for incoming bundles
1386 DBGPRINT2("---->> Thread%d starting..\n",td.thread->get_id());
1387 DBGPRINT2("---->> Thread%d locking queueMutex..\n",td.thread->get_id());
1388 queueMutex.lock(); //enter wait-for-notification loop
1389 while (bundleWork) {
1390 DBGPRINT3("---->> Thread%d: waiting.. (queue len=%d)\n",td.thread->get_id(), bundleQueue->Count());
1391 waitMutex.lock();
1392 threadsWaiting++;
1393 queueMutex.unlock();
1394 waitMutex.unlock();
1395 haveThreads.notify_one(); //in case main thread is waiting
1396 current_thread::yield();
1397 queueMutex.lock();
1398 while (bundleWork && bundleQueue->Count()==0) {
1399 haveBundles.wait(queueMutex);//unlocks queueMutex and wait until notified
1400 //when notified, locks queueMutex and resume
1401 }
1402 waitMutex.lock();
1403 if (threadsWaiting>0) threadsWaiting--;
1404 waitMutex.unlock();
1405 DBGPRINT3("---->> Thread%d: awakened! (queue len=%d)\n",td.thread->get_id(),bundleQueue->Count());
1406 BundleData* readyBundle=NULL;
1407 if ((bundleWork & 0x02)!=0 && (readyBundle=bundleQueue->Pop())!=NULL) { //is bit 1 set?
1408 if (bundleQueue->Count()==0)
1409 bundleWork &= ~(int)0x02; //clear bit 1 (queue is empty)
1410 //Num_Fragments+=readyBundle->num_fragments;
1411 //Frag_Len+=readyBundle->frag_len;
1412 queueMutex.unlock();
1413 processBundle(readyBundle);
1414 DBGPRINT3("---->> Thread%d processed bundle #%d, now locking back dataMutex and queueMutex\n",
1415 td.thread->get_id(), readyBundle->idx);
1416 dataMutex.lock();
1417 dataClear.Push(readyBundle->idx);
1418 DBGPRINT3("---->> Thread%d pushed bundle #%d into dataClear",
1419 td.thread->get_id(), readyBundle->idx);
1420 dataMutex.unlock();
1421 DBGPRINT2("---->> Thread%d informing main thread and yielding", td.thread->get_id());
1422 haveClear.notify_one(); //inform main thread
1423 current_thread::yield();
1424 DBGPRINT2("---->> Thread%d processed bundle, now locking back queueMutex\n", td.thread->get_id());
1425 queueMutex.lock();
1426 DBGPRINT2("---->> Thread%d locked back queueMutex\n", td.thread->get_id());
1427
1428 }
1429 //haveThreads.notify_one();
1430 } //while there is reason to live
1431 queueMutex.unlock();
1432 DBGPRINT2("---->> Thread%d DONE.\n", td.thread->get_id());
1433 }
1434
1435 //prepare the next available bundle slot for loading
waitForData(BundleData * bundles)1436 int waitForData(BundleData* bundles) {
1437 int bidx=-1;
1438 dataMutex.lock();
1439 DBGPRINT(" #waitForData: locking dataMutex");
1440 while (dataClear.Count()==0) {
1441 DBGPRINT(" #waitForData: dataClear.Count is 0, waiting for dataMutex");
1442 haveClear.wait(dataMutex);
1443 }
1444 bidx=dataClear.Pop();
1445 if (bidx>=0) {
1446 bundles[bidx].status=BUNDLE_STATUS_LOADING;
1447 }
1448
1449 DBGPRINT(" #waitForData: unlocking dataMutex");
1450 dataMutex.unlock();
1451 return bidx;
1452 }
1453
1454 #endif
1455
writeUnbundledGenes(GHash<CGene> & geneabs,const char * refseq,FILE * gout)1456 void writeUnbundledGenes(GHash<CGene>& geneabs, const char* refseq, FILE* gout) {
1457 //write unbundled genes from this chromosome
1458 geneabs.startIterate();
1459 while (CGene* g=geneabs.NextData()) {
1460 const char* geneID=g->geneID;
1461 const char* geneName=g->geneName;
1462 if (geneID==NULL) geneID=".";
1463 if (geneName==NULL) geneName=".";
1464 fprintf(gout, "%s\t%s\t%s\t%c\t%d\t%d\t0.0\t0.0\t0.0\n",
1465 geneID, geneName, refseq,
1466 g->strand, g->start, g->end);
1467 }
1468 geneabs.Clear();
1469 }
1470
writeUnbundledGuides(GVec<GRefData> & refdata,FILE * fout,FILE * gout)1471 void writeUnbundledGuides(GVec<GRefData>& refdata, FILE* fout, FILE* gout) {
1472 for (int g=0;g<refdata.Count();++g) {
1473 GRefData& crefd=refdata[g];
1474 if (crefd.rnas.Count()==0) continue;
1475 GHash<CGene> geneabs;
1476 //gene_id abundances (0), accumulating coords
1477 for (int m=0;m<crefd.rnas.Count();++m) {
1478 GffObj &t = *crefd.rnas[m];
1479 RC_TData &td = *(RC_TData*) (t.uptr);
1480 if (td.in_bundle) {
1481 if (gout && m==crefd.rnas.Count()-1)
1482 writeUnbundledGenes(geneabs, crefd.gseq_name, gout);
1483 continue;
1484 }
1485 //write these guides to output
1486 //for --merge and -e
1487 if (mergeMode || eonly) {
1488 fprintf(fout, "%s\t%s\ttranscript\t%d\t%d\t.\t%c\t.\t",
1489 crefd.gseq_name, t.getTrackName(), t.start, t.end, t.strand);
1490 if (t.getGeneID())
1491 fprintf(fout, "gene_id \"%s\"; ", t.getGeneID());
1492 fprintf(fout, "transcript_id \"%s\";",t.getID());
1493 if (eonly) {
1494 if (t.getGeneName())
1495 fprintf(fout, " gene_name \"%s\";", t.getGeneName());
1496 fprintf(fout, " cov \"0.0\"; FPKM \"0.0\"; TPM \"0.0\";");
1497 }
1498 else { //merge_mode
1499 if (t.getGeneName())
1500 fprintf(fout, " gene_name \"%s\";", t.getGeneName());
1501 if (t.getGeneID())
1502 fprintf(fout, " ref_gene_id \"%s\";", t.getGeneID());
1503 }
1504 fprintf(fout, "\n");
1505 for (int e=0;e<t.exons.Count();++e) {
1506 fprintf(fout,"%s\t%s\texon\t%d\t%d\t.\t%c\t.\t",
1507 crefd.gseq_name, t.getTrackName(), t.exons[e]->start, t.exons[e]->end, t.strand);
1508 if (t.getGeneID())
1509 fprintf(fout, "gene_id \"%s\"; ", t.getGeneID());
1510 fprintf(fout,"transcript_id \"%s\"; exon_number \"%d\";",
1511 t.getID(), e+1);
1512
1513 if (t.getGeneName())
1514 fprintf(fout, " gene_name \"%s\";", t.getGeneName());
1515 if (eonly) {
1516 fprintf(fout, " cov \"0.0\";");
1517 }
1518 fprintf(fout, "\n");
1519 }
1520 }
1521 if (gout!=NULL) {
1522 //gather coords for this gene_id
1523 char* geneid=t.getGeneID();
1524 if (geneid==NULL) geneid=t.getGeneName();
1525 if (geneid!=NULL) {
1526 CGene* gloc=geneabs.Find(geneid);
1527 if (gloc) {
1528 if (gloc->strand!=t.strand)
1529 GMessage("Warning: gene \"%s\" (on %s) has reference transcripts on both strands?\n",
1530 geneid, crefd.gseq_name);
1531 if (t.start<gloc->start) gloc->start=t.start;
1532 if (t.end>gloc->end) gloc->end=t.end;
1533 } else {
1534 //add new geneid locus
1535 geneabs.Add(geneid, new CGene(t.start, t.end, t.strand, t.getGeneID(), t.getGeneName()));
1536 }
1537 }
1538 if (m==crefd.rnas.Count()-1)
1539 writeUnbundledGenes(geneabs, crefd.gseq_name, gout);
1540 } //if geneabundance
1541 }
1542 }
1543 }
1544
1545
1546
1547
1548