1 package jgi; 2 3 import java.io.File; 4 import java.util.ArrayList; 5 import java.util.Locale; 6 import java.util.concurrent.atomic.AtomicIntegerArray; 7 8 import aligner.Aligner; 9 import aligner.MultiStateAligner9PacBioAdapter2; 10 import aligner.SingleStateAlignerFlat; 11 import aligner.SingleStateAlignerFlat2; 12 import aligner.SingleStateAlignerFlat2Amino; 13 import aligner.SingleStateAlignerFlat2_1D; 14 import dna.AminoAcid; 15 import fileIO.FileFormat; 16 import fileIO.ReadWrite; 17 import shared.Parse; 18 import shared.Parser; 19 import shared.PreParser; 20 import shared.Shared; 21 import shared.Timer; 22 import shared.Tools; 23 import stream.ConcurrentReadInputStream; 24 import stream.ConcurrentReadOutputStream; 25 import stream.FastaReadInputStream; 26 import stream.Read; 27 import stream.ReadStreamWriter; 28 import stream.SamLine; 29 import stream.SiteScore; 30 import structures.ByteBuilder; 31 import structures.ListNum; 32 import template.Accumulator; 33 import template.ThreadWaiter; 34 35 /** 36 * @author Brian Bushnell 37 * @date Oct 6, 2014 38 * 39 */ 40 public class FindPrimers implements Accumulator<FindPrimers.ProcessThread> { 41 main(String[] args)42 public static void main(String[] args){ 43 Timer t=new Timer(); 44 FindPrimers x=new FindPrimers(args); 45 x.process(t); 46 47 //Close the print stream if it was redirected 48 Shared.closeStream(x.outstream); 49 } 50 FindPrimers(String[] args)51 public FindPrimers(String[] args){ 52 53 {//Preparse block for help, config files, and outstream 54 PreParser pp=new PreParser(args, getClass(), false); 55 args=pp.args; 56 outstream=pp.outstream; 57 } 58 59 Shared.capBufferLen(8); 60 61 float cutoff_=0; 62 String literal_=null; 63 String ref_=null; 64 Parser parser=new Parser(); 65 for(int i=0; i<args.length; i++){ 66 String arg=args[i]; 67 String[] split=arg.split("="); 68 String a=split[0].toLowerCase(); 69 String b=split.length>1 ? split[1] : null; 70 71 if(a.equals("rcomp")){ 72 rcomp=Parse.parseBoolean(b); 73 }else if(a.equals("usemsa2") || a.equals("msa2")){ 74 useMSA2=Parse.parseBoolean(b); 75 }else if(a.equals("usessa2") || a.equals("ssa2")){ 76 useSSA2=Parse.parseBoolean(b); 77 }else if(a.equalsIgnoreCase("usessa1d") || a.equalsIgnoreCase("ssa1d")){ 78 useSSA1D=Parse.parseBoolean(b); 79 }else if(a.equals("swap")){ 80 swapQuery=Parse.parseBoolean(b); 81 }else if(a.equals("printzeros")){ 82 printZeros=Parse.parseBoolean(b); 83 }else if(a.equals("twocolumn") || a.equals("2column")){ 84 oneColumn=!Parse.parseBoolean(b); 85 }else if(a.equals("onecolumn")){ 86 oneColumn=Parse.parseBoolean(b); 87 }else if(a.equals("addr")){ 88 addR=Parse.parseBoolean(b); 89 }else if(a.equals("replicate") || a.equals("expand")){ 90 replicateAmbiguous=Parse.parseBoolean(b); 91 }else if(a.equals("literal")){ 92 literal_=b; 93 }else if(a.equals("cutoff") || a.equals("minid")){ 94 cutoff_=Float.parseFloat(b); 95 if(cutoff_>1){cutoff_=cutoff_/100;} 96 assert(cutoff_>=0 && cutoff_<=1) : "Cutoff should range from 0 to 1"; 97 }else if(a.equals("primer") || a.equals("query") || a.equals("ref")){ 98 if(new File(b).exists()){ref_=b;} 99 else{literal_=b;} 100 }else if(a.equals("idhist")){ 101 outIdHist=b; 102 }else if(a.equals("parse_flag_goes_here")){ 103 //Set a variable here 104 }else if(parser.parse(arg, a, b)){ 105 //do nothing 106 }else{ 107 outstream.println("Unknown parameter "+args[i]); 108 assert(false) : "Unknown parameter "+args[i]; 109 // throw new RuntimeException("Unknown parameter "+args[i]); 110 } 111 } 112 113 {//Process parser fields 114 Parser.processQuality(); 115 116 maxReads=parser.maxReads; 117 in1=parser.in1; 118 out1=parser.out1; 119 } 120 cutoff=cutoff_; 121 122 // ArrayList<byte[]> sharedHeader=new ArrayList<byte[]>(); 123 ByteBuilder sharedHeader=new ByteBuilder(); 124 sharedHeader.append("@HD\tVN:1.4\tSO:unsorted\n"); 125 if(ref_!=null){ 126 ArrayList<Read> list=FastaReadInputStream.toReads(ref_, FileFormat.FASTA, -1); 127 int max=0; 128 queries=new ArrayList<Read>(); 129 // assert(false) : list; 130 for(int i=0; i<list.size(); i++){ 131 Read r=list.get(i); 132 max=Tools.max(max, r.length()); 133 if(swapQuery){ 134 sharedHeader.append("@SQ\tSN:"+r.name()+"\tLN:"+r.length()).nl(); 135 } 136 queries.add(r); 137 // if(rcomp){ 138 // r=r.copy(); 139 // r.reverseComplement(); 140 // if(addR){r.id="r_"+r.id;} 141 // r.setStrand(1); 142 // queries.add(r); 143 // } 144 } 145 maxqlen=max; 146 }else if(literal_!=null){ 147 int max=0; 148 String[] s2=literal_.split(","); 149 queries=new ArrayList<Read>(); 150 for(int i=0; i<s2.length; i++){ 151 Read r=new Read(s2[i].getBytes(), null, "query"+i, i); 152 max=Tools.max(max, r.length()); 153 queries.add(r); 154 if(swapQuery){ 155 sharedHeader.append("@SQ\tSN:"+r.name()+"\tLN:"+r.length()).nl(); 156 } 157 } 158 maxqlen=max; 159 }else{ 160 queries=null; 161 maxqlen=0; 162 } 163 164 if(sharedHeader.length()>0) { 165 ReadStreamWriter.HEADER=sharedHeader; 166 } 167 168 if(replicateAmbiguous){ 169 queries=Tools.replicateAmbiguous(queries, 1); 170 } 171 if(Shared.AMINO_IN){rcomp=false;} 172 if(rcomp){ 173 for(int i=0, max=queries.size(); i<max; i++){ 174 Read r=queries.get(i).copy(); 175 r.reverseComplement(); 176 if(addR){r.id="r_"+r.id;} 177 r.setStrand(1); 178 queries.add(r); 179 } 180 } 181 182 // assert(false) : queries; 183 184 ffout1=FileFormat.testOutput(out1, FileFormat.ATTACHMENT, "attachment", true, true, false, ordered); 185 // assert(ffout1.type()==FileFormat.ATTACHMENT) : ffout1; 186 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, null, true, true); 187 } 188 process(Timer t)189 void process(Timer t){ 190 191 final ConcurrentReadInputStream cris; 192 { 193 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, null); 194 if(verbose){outstream.println("Started cris");} 195 cris.start(); //4567 196 } 197 boolean paired=cris.paired(); 198 199 final ConcurrentReadOutputStream ros=makeCros(); 200 201 // final ByteStreamWriter bsw; 202 // if(out1!=null){ 203 // 204 // assert(!out1.equalsIgnoreCase(in1) && !out1.equalsIgnoreCase(in1)) : "Input file and output file have same name."; 205 // 206 // bsw=new ByteStreamWriter(ffout1); 207 // bsw.start(); 208 // }else{bsw=null;} 209 210 spawnThreads(cris, ros); 211 212 // if(bsw!=null){bsw.poisonAndWait();} 213 ReadWrite.closeStreams(cris, ros); 214 215 int minid=100; 216 { 217 ByteBuilder bb=new ByteBuilder(); 218 bb.append(oneColumn ? ReadWrite.stripToCore(outIdHist) : "ID\tCount").nl(); 219 for(int i=0; i<idHist.length(); i++){ 220 int count=idHist.get(i); 221 if(count>0 || printZeros) { 222 if(!oneColumn){bb.append(i).tab();} 223 bb.append(count).nl(); 224 if(count>0){minid=Tools.min(minid, i);} 225 } 226 } 227 if(outIdHist!=null){ReadWrite.writeString(bb, outIdHist);} 228 } 229 230 if(verbose){outstream.println("Finished.");} 231 232 t.stop(); 233 outstream.println("Time: \t"+t); 234 outstream.println("Reads Processed: "+readsProcessed+" \t"+String.format(Locale.ROOT, "%.2fk reads/sec", (readsProcessed/(double)(t.elapsed))*1000000)); 235 outstream.println("Average Identity: "+String.format(Locale.ROOT, "%.3f%%", (identitySum*100/identityCount))); 236 outstream.println("Min Identity: "+minid); 237 } 238 makeCros()239 private ConcurrentReadOutputStream makeCros(){ 240 if(ffout1==null){return null;} 241 242 //Select output buffer size based on whether it needs to be ordered 243 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8); 244 245 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout1, null, buff, null, true); 246 ros.start(); //Start the stream 247 return ros; 248 } 249 250 /*--------------------------------------------------------------*/ 251 /*---------------- Thread Management ----------------*/ 252 /*--------------------------------------------------------------*/ 253 254 /** Spawn process threads */ spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros)255 private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){ 256 257 //Do anything necessary prior to processing 258 259 //Determine how many threads may be used 260 final int threads=Shared.threads(); 261 262 //Fill a list with ProcessThreads 263 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads); 264 for(int i=0; i<threads; i++){ 265 alpt.add(new ProcessThread(cris, ros, i)); 266 } 267 268 //Start the threads and wait for them to finish 269 boolean success=ThreadWaiter.startAndWait(alpt, this); 270 errorState&=!success; 271 272 //Do anything necessary after processing 273 274 } 275 276 @Override accumulate(ProcessThread pt)277 public final void accumulate(ProcessThread pt){ 278 readsProcessed+=pt.readsProcessedT; 279 basesProcessed+=pt.basesProcessedT; 280 readsOut+=pt.readsOutT; 281 basesOut+=pt.basesOutT; 282 identitySum+=pt.identitySumT; 283 identityCount+=pt.identityCountT; 284 errorState|=(!pt.success); 285 } 286 287 @Override success()288 public final boolean success(){return !errorState;} 289 290 /*--------------------------------------------------------------*/ 291 /*---------------- Inner Classes ----------------*/ 292 /*--------------------------------------------------------------*/ 293 294 /** This class is static to prevent accidental writing to shared variables. 295 * It is safe to remove the static modifier. */ 296 class ProcessThread extends Thread { 297 298 //Constructor ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_)299 ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_){ 300 cris=cris_; 301 ros=ros_; 302 tid=tid_; 303 } 304 305 //Called by start() 306 @Override run()307 public void run(){ 308 //Do anything necessary prior to processing 309 310 //Process the reads 311 processInner(); 312 313 //Do anything necessary after processing 314 315 //Indicate successful exit status 316 success=true; 317 } 318 319 /** Iterate through the reads */ processInner()320 void processInner(){ 321 322 //Grab the first ListNum of reads 323 ListNum<Read> ln=cris.nextList(); 324 325 //Check to ensure pairing is as expected 326 if(ln!=null && !ln.isEmpty()){ 327 Read r=ln.get(0); 328 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access 329 } 330 331 //As long as there is a nonempty read list... 332 while(ln!=null && ln.size()>0){ 333 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access 334 335 processList(ln); 336 337 //Notify the input stream that the list was used 338 cris.returnList(ln); 339 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access 340 341 //Fetch a new list 342 ln=cris.nextList(); 343 } 344 345 //Notify the input stream that the final list was used 346 if(ln!=null){ 347 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); 348 } 349 } 350 processList(ListNum<Read> ln)351 void processList(ListNum<Read> ln){ 352 353 //Grab the actual read list from the ListNum 354 final ArrayList<Read> reads=ln.list; 355 356 //Loop through each read in the list 357 for(int idx=0; idx<reads.size(); idx++){ 358 final Read r=reads.get(idx); 359 assert(r.mate==null); 360 361 //Validate reads in worker threads 362 if(!r.validated()){r.validate(true);} 363 364 //Track the initial length for statistics 365 final int initialLength1=r.length(); 366 367 //Increment counters 368 readsProcessedT+=r.pairCount(); 369 basesProcessedT+=initialLength1; 370 371 { 372 //Reads are processed in this block. 373 boolean keep=processRead(r); 374 375 if(!keep){reads.set(idx, null);} 376 else{ 377 readsOutT+=r.pairCount(); 378 basesOutT+=r.pairLength(); 379 } 380 } 381 } 382 383 //Output reads to the output stream 384 if(ros!=null){ros.add(reads, ln.id);} 385 } 386 387 /** 388 * Process a read or a read pair. 389 * @param r1 Read 1 390 * @param r2 Read 2 (may be null) 391 * @return True if the reads should be kept, false if they should be discarded. 392 */ processRead(final Read r)393 boolean processRead(final Read r){ 394 395 final int a=0, b=(swapQuery ? maxqlen-1 : r.length()-1); 396 int[] max; 397 398 399 SiteScore bestSite=null; 400 Read bestQuery=null; 401 for(int qnum=0; qnum<queries.size(); qnum++){ 402 final Read query=queries.get(qnum); 403 404 // System.err.println("msa.maxColumns()="+msa.maxColumns()+", msa.maxRows()="+msa.maxRows()+ 405 // ", maxqlen="+maxqlen+", rlen="+r.length()+", qlen="+query.length()); 406 407 408 //{rows, maxCol, maxState, maxScore, maxStart} 409 if(swapQuery){ 410 max=msa.fillUnlimited(r.bases, query.bases, a, b, -999999); 411 }else{ 412 max=msa.fillUnlimited(query.bases, r.bases, a, b, -999999); 413 } 414 // assert(false) : new String(r.bases)+"\n"+new String(queries.get(0).bases); 415 416 // System.err.println("\nAligned: "+new String(r.bases).substring(0, 10)+"..."+new String(r.bases).substring(r.bases.length-11)+ 417 // ", "+new String(query.bases).substring(0, 10)+"..."+new String(query.bases).substring(query.bases.length-11)); 418 419 if(max!=null){ 420 421 final int rows=max[0]; 422 final int maxCol=max[1]; 423 final int maxState=max[2]; 424 // final int maxScore=max[3]; 425 // final int maxStart=max[4]; 426 427 //{score, bestRefStart, bestRefStop} 428 //byte[] read, byte[] ref, int refStartLoc, int refEndLoc, int maxRow, int maxCol, int maxState 429 // int[] score=msa.score(query.bases, r.bases, a, b, max[0], max[1], max[2]); 430 int[] score; 431 if(swapQuery){ 432 score=msa.score(r.bases, query.bases, a, b, rows, maxCol, maxState); 433 }else{ 434 //returns {score, bestRefStart, bestRefStop} 435 score=msa.score(query.bases, r.bases, a, b, rows, maxCol, maxState); 436 } 437 SiteScore ss=new SiteScore(1, query.strand(), score[1], score[2], 1, score[0]); 438 // System.err.println("Score: "+ss.quickScore); 439 if(bestSite==null || ss.quickScore>bestSite.quickScore){ 440 bestQuery=query; 441 ss.setSlowScore(ss.quickScore); 442 ss.score=ss.quickScore; 443 // ss.match=msa.traceback(query.bases, r.bases, a, b, score[3], score[4], score[5], false); 444 445 //(byte[] read, byte[] ref, int refStartLoc, int refEndLoc, int row, int col, int state) 446 //int refStartLoc, int refEndLoc, int row, int col, int state 447 if(swapQuery){ 448 ss.match=msa.traceback(r.bases, query.bases, a, b, rows, maxCol, maxState); 449 }else{ 450 ss.match=msa.traceback(query.bases, r.bases, a, b, rows, maxCol, maxState); 451 } 452 // System.err.println(new String(ss.match)); 453 // System.err.println(Read.identity(ss.match)); 454 bestSite=ss; 455 } 456 } 457 } 458 459 ByteBuilder bb2=new ByteBuilder(toBytes(r, bestQuery, bestSite)); 460 r.obj=bb2; 461 462 return true; 463 } 464 465 466 /*--------------------------------------------------------------*/ 467 toBytes(Read r, Read query, SiteScore ss)468 private ByteBuilder toBytes(Read r, Read query, SiteScore ss){ 469 bb.clear(); 470 if(ss==null){return toBytes(r, query);} 471 float identity=Read.identity(ss.match); 472 idHist.incrementAndGet((int)(100*identity)); 473 if(identity<cutoff){return toBytes(r, query);} 474 if(swapQuery){ 475 bb.append(r.id).append('\t'); 476 }else{ 477 bb.append(query.id).append('\t'); 478 } 479 bb.append(makeFlag(ss)).append('\t'); 480 if(swapQuery){ 481 bb.append(query.id).append('\t'); 482 }else{ 483 bb.append(r.id.replace('\t', '_')).append('\t'); 484 } 485 bb.append(Tools.max(0, ss.start)+1).append('\t'); 486 bb.append(Tools.max(ss.score/query.length(), 4)).append('\t'); 487 String cigar; 488 if(swapQuery){ 489 cigar=SamLine.toCigar14(ss.match, ss.start, ss.stop, query.length(), r.bases); 490 // assert(SamLine.calcCigarReadLength(cigar, true, false)==r.length()) : 491 // "cigar="+SamLine.calcCigarReadLength(cigar, true, false)+", r="+r.length()+", q="+query.length()+", swap="+swapQuery+", rid="+r.id; 492 }else{ 493 cigar=SamLine.toCigar14(ss.match, ss.start, ss.stop, r.length(), query.bases); 494 // assert(SamLine.calcCigarReadLength(cigar, true, false)==query.length()) : 495 // "cigar="+SamLine.calcCigarReadLength(cigar, true, false)+", r="+r.length()+", q="+query.length()+", swap="+swapQuery+", rid="+r.id; 496 } 497 if(cigar==null){bb.append('*').append('\t');}else{bb.append(cigar).append('\t');} 498 bb.append('*').append('\t'); 499 bb.append('0').append('\t'); 500 bb.append('0').append('\t'); 501 502 if(swapQuery){ 503 if(ss.strand()==Shared.MINUS){ 504 for(int i=r.bases.length-1; i>=0; i--){bb.append(AminoAcid.baseToComplementExtended[r.bases[i]]);} 505 bb.tab(); 506 }else{ 507 bb.append(r.bases).append('\t'); 508 } 509 }else{ 510 bb.append(query.bases).append('\t'); 511 } 512 // assert(ss.strand()!=Shared.MINUS) : bb+"\n"+new String(r.bases); 513 bb.append('*').append('\t'); 514 515 identitySumT+=identity; 516 identityCountT++; 517 bb.append("YI:f:").append(100*identity, 2); 518 519 return bb; 520 } 521 toBytes(Read r, Read query)522 private ByteBuilder toBytes(Read r, Read query){ 523 bb.clear(); 524 // if(cutoff>0){return bb;} 525 if(swapQuery){ 526 bb.append(r.id).append('\t'); 527 }else{ 528 bb.append(query.id).append('\t'); 529 } 530 bb.append(4).append('\t'); 531 bb.append('*').append('\t'); 532 bb.append(0).append('\t'); 533 bb.append(0).append('\t'); 534 bb.append('*').append('\t'); 535 bb.append('*').append('\t'); 536 bb.append('0').append('\t'); 537 bb.append('0').append('\t'); 538 539 if(swapQuery){ 540 bb.append(r.bases).append('\t'); 541 }else{ 542 bb.append(query.bases).append('\t'); 543 } 544 bb.append('*').append('\t'); 545 546 // identitySumT+=f; 547 // identityCountT++; 548 // bb.append("YI:f:").append(100*f, 2); 549 550 return bb; 551 } 552 553 //This seems to be up to 30% faster if the raw class rather than interface is used. 554 Aligner msa=Shared.AMINO_IN ? new SingleStateAlignerFlat2Amino() : useMSA2 ? new MultiStateAligner9PacBioAdapter2() : 555 useSSA1D ? new SingleStateAlignerFlat2_1D() : useSSA2 ? new SingleStateAlignerFlat2() : new SingleStateAlignerFlat(); 556 // SingleStateAlignerFlat msa=new SingleStateAlignerFlat(maxqlen+5, maxqlen+5); 557 558 SingleStateAlignerFlat xxx0; //Never fastest 559 MultiStateAligner9PacBioAdapter2 xxx1; //Best for indels, but slowest 560 SingleStateAlignerFlat2 xxx2; //Fastest for 16S 561 SingleStateAlignerFlat2_1D xxx3; //Fastest for 23S 562 Aligner xxx9; 563 564 /** Number of reads processed by this thread */ 565 protected long readsProcessedT=0; 566 /** Number of bases processed by this thread */ 567 protected long basesProcessedT=0; 568 569 /** Number of reads retained by this thread */ 570 protected long readsOutT=0; 571 /** Number of bases retained by this thread */ 572 protected long basesOutT=0; 573 574 double identitySumT=0; 575 long identityCountT=0; 576 577 /** True only if this thread has completed successfully */ 578 boolean success=false; 579 580 private ByteBuilder bb=new ByteBuilder(1000); 581 582 /** Shared input stream */ 583 private final ConcurrentReadInputStream cris; 584 /** Shared output stream */ 585 private final ConcurrentReadOutputStream ros; 586 /** Thread ID */ 587 final int tid; 588 } 589 makeFlag(SiteScore ss)590 public static int makeFlag(SiteScore ss){ 591 int flag=0; 592 if(ss.strand()==Shared.MINUS){flag|=0x10;} 593 return flag; 594 } 595 596 597 598 /*--------------------------------------------------------------*/ 599 600 /*--------------------------------------------------------------*/ 601 602 private String in1=null; 603 private String out1=null; 604 private String outIdHist=null; 605 606 private AtomicIntegerArray idHist=new AtomicIntegerArray(101); 607 608 private final float cutoff; 609 private boolean rcomp=true; 610 private boolean replicateAmbiguous=false; 611 private boolean swapQuery=false; 612 private boolean addR=false; 613 private boolean printZeros=true; 614 private boolean oneColumn=false; 615 private boolean useMSA2=false; 616 private boolean useSSA2=true; 617 private boolean useSSA1D=false; 618 619 private final FileFormat ffin1; 620 private final FileFormat ffout1; 621 private ArrayList<Read> queries; 622 private final int maxqlen; 623 624 /*--------------------------------------------------------------*/ 625 626 protected long readsProcessed=0; 627 protected long basesProcessed=0; 628 629 protected long readsOut=0; 630 protected long basesOut=0; 631 632 private long maxReads=-1; 633 634 double identitySum=0; 635 long identityCount=0; 636 637 boolean ordered=true; 638 boolean errorState=false; 639 640 /*--------------------------------------------------------------*/ 641 642 private java.io.PrintStream outstream=System.err; 643 public static boolean verbose=false; 644 645 } 646