1 package clump; 2 3 import java.io.File; 4 import java.io.PrintStream; 5 import java.util.ArrayList; 6 7 import bloom.KCountArray; 8 import fileIO.ByteFile; 9 import fileIO.FileFormat; 10 import fileIO.ReadWrite; 11 import jgi.BBMerge; 12 import shared.KillSwitch; 13 import shared.Parse; 14 import shared.Parser; 15 import shared.PreParser; 16 import shared.ReadStats; 17 import shared.Shared; 18 import shared.Timer; 19 import shared.Tools; 20 import stream.ConcurrentReadInputStream; 21 import stream.ConcurrentReadOutputStream; 22 import stream.FASTQ; 23 import stream.FastaReadInputStream; 24 import stream.Read; 25 import structures.ListNum; 26 import structures.Quantizer; 27 28 /** 29 * @author Brian Bushnell 30 * @date June 20, 2014 31 * 32 */ 33 public class KmerSplit { 34 35 /*--------------------------------------------------------------*/ 36 /*---------------- Initialization ----------------*/ 37 /*--------------------------------------------------------------*/ 38 39 /** 40 * Code entrance from the command line. 41 * @param args Command line arguments 42 */ main(String[] args)43 public static void main(String[] args){ 44 final boolean pigz=ReadWrite.USE_PIGZ, unpigz=ReadWrite.USE_UNPIGZ; 45 final boolean oldFInt=FASTQ.FORCE_INTERLEAVED, oldTInt=FASTQ.TEST_INTERLEAVED; 46 final int zl=ReadWrite.ZIPLEVEL; 47 final float ztd=ReadWrite.ZIP_THREAD_MULT; 48 final int mzt=ReadWrite.MAX_ZIP_THREADS; 49 Timer t=new Timer(); 50 KmerSplit x=new KmerSplit(args); 51 ReadWrite.ZIPLEVEL=Tools.min(ReadWrite.ZIPLEVEL, maxZipLevel); 52 x.process(t); 53 ReadWrite.USE_PIGZ=pigz; 54 ReadWrite.USE_UNPIGZ=unpigz; 55 ReadWrite.ZIPLEVEL=zl; 56 ReadWrite.ZIP_THREAD_MULT=ztd; 57 ReadWrite.MAX_ZIP_THREADS=mzt; 58 FASTQ.FORCE_INTERLEAVED=oldFInt; 59 FASTQ.TEST_INTERLEAVED=oldTInt; 60 61 //Close the print stream if it was redirected 62 Shared.closeStream(x.outstream); 63 } 64 65 /** 66 * Constructor. 67 * @param args Command line arguments 68 */ KmerSplit(String[] args)69 public KmerSplit(String[] args){ 70 71 {//Preparse block for help, config files, and outstream 72 PreParser pp=new PreParser(args, getClass(), false); 73 args=pp.args; 74 outstream=pp.outstream; 75 } 76 77 ReadWrite.USE_PIGZ=false; 78 ReadWrite.USE_UNPIGZ=true; 79 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); 80 81 boolean setInterleaved=false; //Whether it was explicitly set. 82 Parser parser=new Parser(); 83 84 for(int i=0; i<args.length; i++){ 85 String arg=args[i]; 86 String[] split=arg.split("="); 87 String a=split[0].toLowerCase(); 88 String b=split.length>1 ? split[1] : null; 89 90 if(parser.parse(arg, a, b)){ 91 //do nothing 92 }else if(a.equals("verbose")){ 93 verbose=KmerComparator.verbose=Parse.parseBoolean(b); 94 }else if(a.equals("parse_flag_goes_here")){ 95 //Set a variable here 96 }else if(a.equals("k")){ 97 k=Integer.parseInt(b); 98 assert(k>0 && k<32); 99 }else if(a.equals("mincount") || a.equals("mincr")){ 100 minCount=Integer.parseInt(b); 101 }else if(a.equals("groups") || a.equals("g") || a.equals("sets") || a.equals("ways")){ 102 groups=Integer.parseInt(b); 103 }else if(a.equals("rename") || a.equals("addname")){ 104 //Do nothing 105 // addName=Parse.parseBoolean(b); 106 }else if(a.equals("shortname") || a.equals("shortnames")){ 107 if(b!=null && b.equals("shrink")){ 108 shrinkName=true; 109 }else{ 110 shrinkName=false; 111 shortName=Parse.parseBoolean(b); 112 } 113 }else if(a.equals("rcomp") || a.equals("reversecomplement")){ 114 //ignore rcomp=Parse.parseBoolean(b); 115 }else if(a.equals("condense") || a.equals("consensus") || a.equals("concensus")){//Note the last one is intentionally misspelled 116 //ignore 117 }else if(a.equals("correct") || a.equals("ecc")){ 118 //ignore 119 }else if(a.equals("passes")){ 120 int x=Integer.parseInt(b); 121 // if(x>1){outstream.println("Warning: KmerSplit does not support multiple passes.");} 122 } 123 124 else if(a.equals("dedupe")){ 125 //ignore 126 }else if(a.equals("markduplicates")){ 127 //ignore 128 }else if(a.equals("markall")){ 129 //ignore 130 }else if(a.equals("addcount") || a.equals("renamebycount")){ 131 //ignore 132 }else if(a.equals("optical") || a.equals("opticalonly")){ 133 //ignore 134 }else if(a.equals("dupesubs") || a.equals("duplicatesubs") || a.equals("dsubs") || a.equals("subs") || a.equals("s")){ 135 //ignore 136 }else if(a.equals("dupedist") || a.equals("duplicatedistance") || a.equals("ddist") || a.equals("dist") || a.equals("opticaldist") || a.equals("distance")){ 137 //ignore 138 }else if(a.equals("scanlimit") || a.equals("scan")){ 139 //ignore 140 }else if(a.equals("removeallduplicates") || a.equals("allduplicates")){ 141 //ignore 142 }else if(a.equals("allowns")){ 143 //ignore 144 }else if(a.equals("containment") || a.equals("absorbcontainment") || a.equals("ac") || a.equals("contains")){ 145 //ignore 146 }else if(a.equalsIgnoreCase("prefixOrSuffix") || a.equalsIgnoreCase("suffixOrPrefix") || a.equals("affix") || a.equals("pos")){ 147 //ignore 148 }else if(a.equals("printduplicates")){ 149 //ignore 150 }else if(a.equals("dupeidentity")){ 151 //ignore 152 }else if(a.equals("dupesubrate") || a.equals("dsr") || a.equals("subrate")){ 153 //ignore 154 } 155 156 else if(a.equals("prefilter")){ 157 KmerReduce.prefilter=Parse.parseBoolean(b); 158 }else if(a.equals("ecco")){ 159 ecco=Parse.parseBoolean(b); 160 }else if(a.equals("seed")){ 161 KmerComparator.defaultSeed=Long.parseLong(b); 162 }else if(a.equals("hashes")){ 163 KmerComparator.setHashes(Integer.parseInt(b)); 164 }else if(a.equals("border")){ 165 KmerComparator.defaultBorder=Integer.parseInt(b); 166 }else if(a.equals("minprob")){ 167 KmerComparator.minProb=Float.parseFloat(b); 168 }else if(a.equals("unpair")){ 169 unpair=Parse.parseBoolean(b); 170 }else if(a.equals("repair")){ 171 //Do nothing 172 }else if(a.equals("namesort") || a.equals("sort")){ 173 //Do nothing 174 }else if(a.equals("fetchthreads")){ 175 //Do nothing 176 }else if(a.equals("reorder") || a.equals("reorderclumps")){ 177 //reorder=Parse.parseBoolean(b); 178 }else if(a.equals("reorderpaired") || a.equals("reorderclumpspaired")){ 179 // reorderpaired=Parse.parseBoolean(b); 180 } 181 182 183 else if(Clump.parseStatic(arg, a, b)){ 184 //Do nothing 185 } 186 187 else{ 188 outstream.println("Unknown parameter "+args[i]); 189 assert(false) : "Unknown parameter "+args[i]; 190 // throw new RuntimeException("Unknown parameter "+args[i]); 191 } 192 } 193 194 {//Process parser fields 195 Parser.processQuality(); 196 197 maxReads=parser.maxReads; 198 199 overwrite=ReadStats.overwrite=parser.overwrite; 200 append=ReadStats.append=parser.append; 201 202 setInterleaved=parser.setInterleaved; 203 204 in1=parser.in1; 205 in2=parser.in2; 206 207 out1=parser.out1; 208 209 extin=parser.extin; 210 extout=parser.extout; 211 } 212 213 if(groups>2){ReadWrite.USE_PIGZ=false;} 214 215 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){ 216 in2=in1.replace("#", "2"); 217 in1=in1.replace("#", "1"); 218 } 219 if(in2!=null){ 220 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");} 221 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; 222 } 223 224 assert(FastaReadInputStream.settingsOK()); 225 226 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} 227 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ 228 ByteFile.FORCE_MODE_BF2=true; 229 } 230 231 if(!setInterleaved){ 232 assert(in1!=null) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\n"; 233 if(in2!=null){ //If there are 2 input streams. 234 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; 235 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); 236 } 237 } 238 239 if(out1!=null && out1.equalsIgnoreCase("null")){out1=null;} 240 241 if(out1!=null){ 242 assert(out1.contains("%")); 243 outArray=new String[groups]; 244 for(int i=0; i<groups; i++){ 245 outArray[i]=out1.replaceFirst("%", ""+i); 246 } 247 if(!Tools.testOutputFiles(overwrite, append, false, outArray)){ 248 outstream.println((out1==null)+", "+out1); 249 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+"\n"); 250 } 251 ffout=new FileFormat[groups]; 252 if(groups>1){ReadWrite.setZipThreadMult(Tools.min(0.5f, 2f/(groups+1)));} 253 for(int i=0; i<groups; i++){ 254 ffout[i]=FileFormat.testOutput(outArray[i], FileFormat.FASTQ, extout, groups<10, overwrite, append, false); 255 } 256 }else{ 257 outArray=null; 258 throw new RuntimeException("out is a required parameter."); 259 } 260 261 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true); 262 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true); 263 } 264 265 266 /*--------------------------------------------------------------*/ 267 /*---------------- Outer Methods ----------------*/ 268 /*--------------------------------------------------------------*/ 269 270 /** Count kmers */ preprocess()271 void preprocess(){ 272 if(minCount>1){ 273 table=ClumpTools.getTable(in1, in2, k, minCount); 274 } 275 } 276 277 /** Create read streams and process all data */ process(Timer t)278 void process(Timer t){ 279 280 preprocess(); 281 282 final ConcurrentReadInputStream cris; 283 { 284 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null); 285 cris.start(); 286 if(verbose){outstream.println("Started cris");} 287 } 288 boolean paired=cris.paired(); 289 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));} 290 if(cris.paired() && (in1==null || !in1.contains(".sam") && !unpair)){ 291 outstream.println("Writing interleaved."); 292 } 293 294 final ConcurrentReadOutputStream ros[]=new ConcurrentReadOutputStream[groups]; 295 try { 296 for(int i=0; i<groups; i++){ 297 final int buff=8; 298 299 assert(!out1.equalsIgnoreCase(in1) && !out1.equalsIgnoreCase(in1)) : "Input file and output file have same name."; 300 301 ros[i]=ConcurrentReadOutputStream.getStream(ffout[i], null, null, null, buff, null, false); 302 ros[i].start(); 303 } 304 } catch (OutOfMemoryError e) { 305 KillSwitch.memKill(e); 306 } 307 308 readsProcessed=0; 309 basesProcessed=0; 310 311 //Process the read stream 312 processInner(cris, ros); 313 314 errorState|=ReadStats.writeAll(); 315 316 t.stop(); 317 318 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); 319 320 if(errorState){ 321 Clumpify.sharedErrorState=true; 322 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); 323 } 324 } 325 326 /** Collect and sort the reads */ processInner(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream[] ros)327 void processInner(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream[] ros){ 328 if(verbose){outstream.println("Making comparator.");} 329 KmerComparator kc=new KmerComparator(k, false, false); 330 if(verbose){outstream.println("Seed: "+kc.seed);} 331 332 if(verbose){outstream.println("Splitting reads.");} 333 splitReads(cris, ros, kc); 334 lastMemProcessed=memProcessed; 335 336 if(verbose){outstream.println("Done!");} 337 } 338 splitReads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream[] ros, final KmerComparator kc)339 public void splitReads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream[] ros, final KmerComparator kc){ 340 Timer t=new Timer(); 341 if(verbose){t.start("Making hash threads.");} 342 final int threads=Shared.threads(); 343 ArrayList<HashThread> alht=new ArrayList<HashThread>(threads); 344 for(int i=0; i<threads; i++){alht.add(new HashThread(i, cris, ros, kc));} 345 346 if(verbose){outstream.println("Starting threads.");} 347 for(HashThread ht : alht){ht.start();} 348 349 350 if(verbose){outstream.println("Waiting for threads.");} 351 /* Wait for threads to die */ 352 for(HashThread ht : alht){ 353 354 /* Wait for a thread to die */ 355 while(ht.getState()!=Thread.State.TERMINATED){ 356 try { 357 ht.join(); 358 } catch (InterruptedException e) { 359 e.printStackTrace(); 360 } 361 } 362 readsProcessed+=ht.readsProcessedT; 363 basesProcessed+=ht.basesProcessedT; 364 diskProcessed+=ht.diskProcessedT; 365 memProcessed+=ht.memProcessedT; 366 } 367 368 if(verbose){outstream.println("Closing streams.");} 369 errorState=ReadWrite.closeStreams(cris, ros)|errorState; 370 if(verbose){t.stop("Split time: ");} 371 } 372 373 /*--------------------------------------------------------------*/ 374 /*---------------- Inner Methods ----------------*/ 375 /*--------------------------------------------------------------*/ 376 377 /*--------------------------------------------------------------*/ 378 /*---------------- Inner Classes ----------------*/ 379 /*--------------------------------------------------------------*/ 380 381 private class HashThread extends Thread{ 382 HashThread(int id_, ConcurrentReadInputStream cris_, ConcurrentReadOutputStream[] ros_, KmerComparator kc_)383 HashThread(int id_, ConcurrentReadInputStream cris_, ConcurrentReadOutputStream[] ros_, KmerComparator kc_){ 384 id=id_; 385 cris=cris_; 386 ros=ros_; 387 kc=kc_; 388 } 389 390 @Override run()391 public void run(){ 392 393 final boolean paired=cris.paired(); 394 ListNum<Read> ln=cris.nextList(); 395 ArrayList<Read> reads=(ln!=null ? ln.list : null); 396 397 ArrayList<Read>[] array=new ArrayList[groups]; 398 for(int i=0; i<groups; i++){ 399 array[i]=new ArrayList<Read>(buffer); 400 } 401 402 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning 403 404 for(Read r : reads){ 405 if(!r.validated()){ 406 r.validate(true); 407 if(r.mate!=null){r.mate.validate(true);} 408 } 409 readsProcessedT+=1+r.mateCount(); 410 basesProcessedT+=r.length()+r.mateLength(); 411 diskProcessedT+=r.countFastqBytes()+r.countMateFastqBytes(); 412 memProcessedT+=r.countBytes()+r.countMateBytes()+ReadKey.overhead; 413 if(shrinkName){ 414 Clumpify.shrinkName(r); 415 Clumpify.shrinkName(r.mate); 416 }else if(shortName){ 417 Clumpify.shortName(r); 418 Clumpify.shortName(r.mate); 419 } 420 421 if(quantizeQuality){ 422 Quantizer.quantize(r, r.mate); 423 } 424 } 425 426 if(ecco){ 427 for(Read r : reads){ 428 if(r.mate!=null){BBMerge.findOverlapStrict(r, r.mate, true);} 429 } 430 } 431 432 ArrayList<Read> hashList=reads; 433 if(paired && unpair){ 434 hashList=new ArrayList<Read>(reads.size()*2); 435 for(Read r1 : reads){ 436 Read r2=r1.mate; 437 hashList.add(r1); 438 hashList.add(r2); 439 r1.mate=null; 440 r2.mate=null; 441 } 442 } 443 444 kc.hash(hashList, table, minCount, true); 445 for(Read r : hashList){ 446 long kmer=((ReadKey)r.obj).kmer; 447 long code=kc.hash(kmer); 448 int code2=(int)(code%groups); 449 assert(code2>=0 && code2<array.length) : code2+", "+groups+", "+array.length+", "+kmer+", "+r.obj+"\n"+r; 450 array[code2].add(r); 451 if(array[code2].size()>=buffer){ 452 ros[code2].add(array[code2], 0); 453 array[code2]=new ArrayList<Read>(buffer); 454 } 455 } 456 cris.returnList(ln); 457 ln=cris.nextList(); 458 reads=(ln!=null ? ln.list : null); 459 } 460 if(ln!=null){ 461 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); 462 } 463 for(int i=0; i<groups; i++){ 464 if(!array[i].isEmpty()){ 465 ros[i].add(array[i], 0); 466 } 467 } 468 } 469 470 final int id; 471 final ConcurrentReadInputStream cris; 472 final ConcurrentReadOutputStream[] ros; 473 final KmerComparator kc; 474 static final int buffer=200; 475 476 protected long readsProcessedT=0; 477 protected long basesProcessedT=0; 478 protected long diskProcessedT=0; 479 protected long memProcessedT=0; 480 } 481 482 /*--------------------------------------------------------------*/ 483 /*---------------- Fields ----------------*/ 484 /*--------------------------------------------------------------*/ 485 486 private int k=31; 487 int groups=16; 488 int minCount=0; 489 490 KCountArray table=null; 491 492 /*--------------------------------------------------------------*/ 493 /*---------------- I/O Fields ----------------*/ 494 /*--------------------------------------------------------------*/ 495 496 private String in1=null; 497 private String in2=null; 498 499 private String out1=null; 500 private String[] outArray=null; 501 502 private String extin=null; 503 private String extout=null; 504 505 /*--------------------------------------------------------------*/ 506 507 protected long readsProcessed=0; 508 protected long basesProcessed=0; 509 protected long diskProcessed=0; 510 protected long memProcessed=0; 511 512 protected static long lastMemProcessed=0; 513 514 private long maxReads=-1; 515 // private boolean addName=false; 516 boolean shortName=false; 517 boolean shrinkName=false; 518 boolean ecco=false; 519 boolean unpair=false; 520 521 static int maxZipLevel=2; 522 523 static boolean quantizeQuality=false; 524 525 /*--------------------------------------------------------------*/ 526 /*---------------- Final Fields ----------------*/ 527 /*--------------------------------------------------------------*/ 528 529 private final FileFormat ffin1; 530 private final FileFormat ffin2; 531 532 private final FileFormat[] ffout; 533 534 /*--------------------------------------------------------------*/ 535 /*---------------- Common Fields ----------------*/ 536 /*--------------------------------------------------------------*/ 537 538 private PrintStream outstream=System.err; 539 public static boolean verbose=false; 540 public boolean errorState=false; 541 private boolean overwrite=false; 542 private boolean append=false; 543 544 } 545