1 package stream; 2 3 import java.io.IOException; 4 import java.io.OutputStream; 5 import java.util.ArrayList; 6 7 import fileIO.FileFormat; 8 import fileIO.ReadWrite; 9 import structures.ByteBuilder; 10 11 public class ReadStreamByteWriter extends ReadStreamWriter { 12 13 /*--------------------------------------------------------------*/ 14 /*---------------- Initialization ----------------*/ 15 /*--------------------------------------------------------------*/ 16 ReadStreamByteWriter(FileFormat ff, String qfname_, boolean read1_, int bufferSize, CharSequence header, boolean useSharedHeader)17 public ReadStreamByteWriter(FileFormat ff, String qfname_, boolean read1_, int bufferSize, CharSequence header, boolean useSharedHeader){ 18 super(ff, qfname_, read1_, bufferSize, header, false, buffered, useSharedHeader); 19 } 20 21 /*--------------------------------------------------------------*/ 22 /*---------------- Execution ----------------*/ 23 /*--------------------------------------------------------------*/ 24 25 @Override run()26 public void run() { 27 try { 28 run2(); 29 } catch (IOException e) { 30 finishedSuccessfully=false; 31 // e.printStackTrace(); 32 throw new RuntimeException(e); 33 } 34 } 35 run2()36 private void run2() throws IOException{ 37 writeHeader(); 38 39 final ByteBuilder bb=new ByteBuilder(65000); 40 final ByteBuilder bbq=(myQOutstream==null ? null : new ByteBuilder(65000)); 41 42 processJobs(bb, bbq); 43 finishWriting(bb, bbq); 44 } 45 46 /*--------------------------------------------------------------*/ 47 /*---------------- Outer Methods ----------------*/ 48 /*--------------------------------------------------------------*/ 49 writeHeader()50 private void writeHeader() throws IOException { 51 if(!OUTPUT_SAM && !OUTPUT_FASTQ && !OUTPUT_FASTA && !OUTPUT_ATTACHMENT && !OUTPUT_HEADER && !OUTPUT_ONELINE){ 52 if(OUTPUT_FASTR){ 53 myOutstream.write("#FASTR".getBytes()); 54 if(OUTPUT_INTERLEAVED){myOutstream.write("\tINT".getBytes());} 55 myOutstream.write('\n'); 56 }else{ 57 if(OUTPUT_INTERLEAVED){ 58 // assert(false) : OUTPUT_SAM+", "+OUTPUT_FASTQ+", "+OUTPUT_FASTA+", "+OUTPUT_ATTACHMENT+", "+OUTPUT_INTERLEAVED+", "+SITES_ONLY; 59 myOutstream.write("#INTERLEAVED\n".getBytes()); 60 } 61 if(SITES_ONLY){ 62 myOutstream.write(("#"+SiteScore.header()+"\n").getBytes()); 63 }else if(!OUTPUT_ATTACHMENT){ 64 myOutstream.write(("#"+Read.header()+"\n").getBytes()); 65 } 66 } 67 } 68 } 69 processJobs(final ByteBuilder bb, final ByteBuilder bbq)70 private void processJobs(final ByteBuilder bb, final ByteBuilder bbq) throws IOException{ 71 72 Job job=null; 73 while(job==null){ 74 try { 75 job=queue.take(); 76 // job.list=queue.take(); 77 } catch (InterruptedException e) { 78 // TODO Auto-generated catch block 79 e.printStackTrace(); 80 } 81 } 82 83 while(job!=null && !job.poison){ 84 85 final OutputStream os=job.outstream; 86 87 if(!job.isEmpty()){ 88 if(myQOutstream!=null){ 89 writeQuality(job, bbq); 90 } 91 92 if(OUTPUT_SAM){ 93 writeSam(job, bb, os); 94 }else if(SITES_ONLY){ 95 writeSites(job, bb, os); 96 }else if(OUTPUT_FASTQ){ 97 writeFastq(job, bb, os); 98 }else if(OUTPUT_FASTA){ 99 writeFasta(job, bb, os); 100 }else if(OUTPUT_ONELINE){ 101 writeOneline(job, bb, os); 102 }else if(OUTPUT_ATTACHMENT){ 103 writeAttachment(job, bb, os); 104 }else if(OUTPUT_HEADER){ 105 writeHeader(job, bb, os); 106 }else if(OUTPUT_FASTR){ 107 writeFastr(job, bb, os); 108 }else{ 109 writeBread(job, bb, os); 110 } 111 } 112 if(job.close){ 113 if(bb.length>0){ 114 os.write(bb.array, 0, bb.length); 115 bb.setLength(0); 116 } 117 assert(job.outstream!=null && job.outstream!=myOutstream); 118 ReadWrite.finishWriting(null, job.outstream, fname, allowSubprocess); //TODO: This should be job.fname 119 } 120 121 job=null; 122 while(job==null){ 123 try { 124 job=queue.take(); 125 } catch (InterruptedException e) { 126 // TODO Auto-generated catch block 127 e.printStackTrace(); 128 } 129 } 130 } 131 } 132 133 /** 134 * @throws IOException 135 * 136 */ finishWriting(final ByteBuilder bb, final ByteBuilder bbq)137 private void finishWriting(final ByteBuilder bb, final ByteBuilder bbq) throws IOException { 138 if(myOutstream!=null){ 139 if(bb.length>0){ 140 myOutstream.write(bb.array, 0, bb.length); 141 bb.setLength(0); 142 } 143 ReadWrite.finishWriting(null, myOutstream, fname, allowSubprocess); 144 } 145 if(myQOutstream!=null){ 146 if(bbq.length>0){ 147 myQOutstream.write(bbq.array, 0, bbq.length); 148 bbq.setLength(0); 149 } 150 ReadWrite.finishWriting(null, myQOutstream, qfname, allowSubprocess); 151 } 152 finishedSuccessfully=true; 153 } 154 155 /*--------------------------------------------------------------*/ 156 /*---------------- Inner Methods ----------------*/ 157 /*--------------------------------------------------------------*/ 158 writeQuality(final Job job, final ByteBuilder bbq)159 private void writeQuality(final Job job, final ByteBuilder bbq) throws IOException{ 160 bbq.setLength(0); 161 if(read1){ 162 for(final Read r : job.list){ 163 if(r!=null){ 164 { 165 bbq.append('>'); 166 bbq.append(r.id); 167 bbq.append('\n'); 168 if(r.bases!=null){toQualityB(r.quality, r.length(), FASTA_WRAP, bbq);} 169 bbq.append('\n'); 170 } 171 Read r2=r.mate; 172 if(OUTPUT_INTERLEAVED && r2!=null){ 173 bbq.append('>'); 174 bbq.append(r2.id); 175 bbq.append('\n'); 176 if(r2.bases!=null){toQualityB(r2.quality, r2.length(), FASTA_WRAP, bbq);} 177 bbq.append('\n'); 178 } 179 } 180 if(bbq.length>=32768 || true){ 181 myQOutstream.write(bbq.array, 0, bbq.length); 182 bbq.setLength(0); 183 } 184 } 185 }else{ 186 for(final Read r1 : job.list){ 187 if(r1!=null){ 188 final Read r2=r1.mate; 189 assert(r2!=null && r2.mate==r1 && r2!=r1) : r1.toText(false); 190 bbq.append('>'); 191 bbq.append(r2.id); 192 bbq.append('\n'); 193 if(r2.bases!=null){toQualityB(r2.quality, r2.length(), FASTA_WRAP, bbq);} 194 bbq.append('\n'); 195 } 196 if(bbq.length>=32768){ 197 myQOutstream.write(bbq.array, 0, bbq.length); 198 bbq.setLength(0); 199 } 200 } 201 } 202 203 // if(bbq.length>0){ 204 // myQOutstream.write(bbq.array, 0, bbq.length); 205 // bbq.setLength(0); 206 // } 207 } 208 209 /** 210 * @param job 211 * @param bb 212 * @param os 213 * @throws IOException 214 */ writeBread(Job job, ByteBuilder bb, OutputStream os)215 private void writeBread(Job job, ByteBuilder bb, OutputStream os) throws IOException { 216 if(read1){ 217 for(final Read r : job.list){ 218 if(r!=null){ 219 r.toText(true, bb).append('\n'); 220 readsWritten++; 221 basesWritten+=r.length(); 222 Read r2=r.mate; 223 if(OUTPUT_INTERLEAVED && r2!=null){ 224 r2.toText(true, bb).append('\n'); 225 readsWritten++; 226 basesWritten+=r2.length(); 227 } 228 229 } 230 if(bb.length>=32768){ 231 os.write(bb.array, 0, bb.length); 232 bb.setLength(0); 233 } 234 } 235 }else{ 236 for(final Read r1 : job.list){ 237 if(r1!=null){ 238 final Read r2=r1.mate; 239 // assert(r2!=null && r2.mate==r1 && r2!=r1) : r1.toText(false); 240 if(r2!=null){ 241 r2.toText(true, bb).append('\n'); 242 readsWritten++; 243 basesWritten+=r2.length(); 244 }else{ 245 //TODO os.print(".\n"); 246 } 247 } 248 if(bb.length>=32768){ 249 os.write(bb.array, 0, bb.length); 250 bb.setLength(0); 251 } 252 } 253 } 254 } 255 256 /** 257 * @param job 258 * @param bb 259 * @param os 260 * @throws IOException 261 */ writeAttachment(Job job, ByteBuilder bb, OutputStream os)262 private void writeAttachment(Job job, ByteBuilder bb, OutputStream os) throws IOException { 263 if(read1){ 264 for(final Read r : job.list){ 265 if(r!=null){ 266 if(r.obj!=null){bb.append(r.obj.toString()).nl();} 267 else if(r.samline!=null){r.samline.toBytes(bb).nl();} 268 readsWritten++; 269 Read r2=r.mate; 270 if(OUTPUT_INTERLEAVED && r2!=null){ 271 if(r2.obj!=null){bb.append(r2.obj.toString()).nl();} 272 else if(r2.samline!=null){r2.samline.toBytes(bb).nl();} 273 readsWritten++; 274 } 275 } 276 if(bb.length>=32768){ 277 os.write(bb.array, 0, bb.length); 278 bb.setLength(0); 279 } 280 } 281 }else{ 282 for(final Read r1 : job.list){ 283 if(r1!=null){ 284 final Read r2=r1.mate; 285 if(r2!=null){ 286 if(r2.obj!=null){bb.append(r2.obj.toString()).nl();} 287 else if(r2.samline!=null){r2.samline.toBytes(bb).nl();} 288 readsWritten++; 289 }else{ 290 // bb.append('.').append('\n'); 291 } 292 } 293 if(bb.length>=32768){ 294 os.write(bb.array, 0, bb.length); 295 bb.setLength(0); 296 } 297 } 298 } 299 } 300 301 /** 302 * @param job 303 * @param bb 304 * @param os 305 * @throws IOException 306 */ writeHeader(Job job, ByteBuilder bb, OutputStream os)307 private void writeHeader(Job job, ByteBuilder bb, OutputStream os) throws IOException { 308 if(read1){ 309 for(final Read r : job.list){ 310 if(r!=null){ 311 bb.append(r.id).append('\n'); 312 readsWritten++; 313 Read r2=r.mate; 314 if(OUTPUT_INTERLEAVED && r2!=null){ 315 bb.append(r2.id).append('\n'); 316 readsWritten++; 317 } 318 } 319 if(bb.length>=32768){ 320 os.write(bb.array, 0, bb.length); 321 bb.setLength(0); 322 } 323 } 324 }else{ 325 for(final Read r1 : job.list){ 326 if(r1!=null){ 327 final Read r2=r1.mate; 328 if(r2!=null){ 329 bb.append(r2.id).append('\n'); 330 readsWritten++; 331 }else{ 332 // bb.append('.').append('\n'); 333 } 334 } 335 if(bb.length>=32768){ 336 os.write(bb.array, 0, bb.length); 337 bb.setLength(0); 338 } 339 } 340 } 341 } 342 343 /** 344 * @param job 345 * @param bb 346 * @param os 347 * @throws IOException 348 */ writeFasta(Job job, ByteBuilder bb, OutputStream os)349 private void writeFasta(Job job, ByteBuilder bb, OutputStream os) throws IOException { 350 if(read1){ 351 for(final Read r : job.list){ 352 if(r!=null){ 353 r.toFasta(FASTA_WRAP, bb).append('\n'); 354 readsWritten++; 355 basesWritten+=r.length(); 356 Read r2=r.mate; 357 if(OUTPUT_INTERLEAVED && r2!=null){ 358 r2.toFasta(FASTA_WRAP, bb).append('\n'); 359 readsWritten++; 360 basesWritten+=r2.length(); 361 } 362 } 363 if(bb.length>=32768){ 364 os.write(bb.array, 0, bb.length); 365 bb.setLength(0); 366 } 367 } 368 }else{ 369 for(final Read r1 : job.list){ 370 if(r1!=null){ 371 final Read r2=r1.mate; 372 assert(ignorePairAssertions || (r2!=null && r2.mate==r1 && r2!=r1)) : "\n"+r1.toText(false)+"\n\n"+(r2==null ? "null" : r2.toText(false)+"\n"); 373 if(r2!=null){ 374 r2.toFasta(FASTA_WRAP, bb).append('\n'); 375 readsWritten++; 376 basesWritten+=r2.length(); 377 } 378 } 379 if(bb.length>=32768){ 380 os.write(bb.array, 0, bb.length); 381 bb.setLength(0); 382 } 383 } 384 } 385 } 386 387 /** 388 * @param job 389 * @param bb 390 * @param os 391 * @throws IOException 392 */ writeOneline(Job job, ByteBuilder bb, OutputStream os)393 private void writeOneline(Job job, ByteBuilder bb, OutputStream os) throws IOException { 394 if(read1){ 395 for(final Read r : job.list){ 396 if(r!=null){ 397 bb.append(r.id).append('\t').append(r.bases).append('\n'); 398 readsWritten++; 399 basesWritten+=r.length(); 400 Read r2=r.mate; 401 if(OUTPUT_INTERLEAVED && r2!=null){ 402 bb.append(r2.id).append('\t').append(r2.bases).append('\n'); 403 readsWritten++; 404 basesWritten+=r2.length(); 405 } 406 } 407 if(bb.length>=32768){ 408 os.write(bb.array, 0, bb.length); 409 bb.setLength(0); 410 } 411 } 412 }else{ 413 for(final Read r1 : job.list){ 414 if(r1!=null){ 415 final Read r2=r1.mate; 416 assert(ignorePairAssertions || (r2!=null && r2.mate==r1 && r2!=r1)) : "\n"+r1.toText(false)+"\n\n"+(r2==null ? "null" : r2.toText(false)+"\n"); 417 if(r2!=null){ 418 bb.append(r2.id).append('\t').append(r2.bases).append('\n'); 419 readsWritten++; 420 basesWritten+=r2.length(); 421 } 422 } 423 if(bb.length>=32768){ 424 os.write(bb.array, 0, bb.length); 425 bb.setLength(0); 426 } 427 } 428 } 429 } 430 431 /** 432 * @param job 433 * @param bb 434 * @param os 435 * @throws IOException 436 */ writeFastq(Job job, ByteBuilder bb, OutputStream os)437 private void writeFastq(Job job, ByteBuilder bb, OutputStream os) throws IOException { 438 if(read1){ 439 for(final Read r : job.list){ 440 if(r!=null){ 441 r.toFastq(bb).append('\n'); 442 readsWritten++; 443 basesWritten+=r.length(); 444 Read r2=r.mate; 445 if(OUTPUT_INTERLEAVED && r2!=null){ 446 r2.toFastq(bb).append('\n'); 447 readsWritten++; 448 basesWritten+=r2.length(); 449 } 450 } 451 if(bb.length>=32768){ 452 os.write(bb.array, 0, bb.length); 453 bb.setLength(0); 454 } 455 } 456 }else{ 457 for(final Read r1 : job.list){ 458 if(r1!=null){ 459 final Read r2=r1.mate; 460 assert(ignorePairAssertions || (r2!=null && r2.mate==r1 && r2!=r1)) : "\n"+r1.toText(false)+"\n\n"+(r2==null ? "null" : r2.toText(false)+"\n"); 461 if(r2!=null){ 462 r2.toFastq(bb).append('\n'); 463 readsWritten++; 464 basesWritten+=r2.length(); 465 } 466 } 467 if(bb.length>=32768){ 468 os.write(bb.array, 0, bb.length); 469 bb.setLength(0); 470 } 471 } 472 } 473 } 474 475 /** 476 * @param job 477 * @param bb 478 * @param os 479 * @throws IOException 480 */ writeFastr(Job job, ByteBuilder bb, OutputStream os)481 private void writeFastr(Job job, ByteBuilder bb, OutputStream os) throws IOException { 482 bb.append(job.list.size()).append('\n'); 483 if(read1){ 484 for(final Read r : job.list){ 485 bb.append(r.id).append('\n'); 486 Read r2=r.mate; 487 if(OUTPUT_INTERLEAVED && r2!=null){ 488 bb.append(r2.id).append('\n'); 489 } 490 } 491 for(final Read r : job.list){ 492 bb.append(r.bases).append('\n'); 493 readsWritten++; 494 basesWritten+=r.length(); 495 496 Read r2=r.mate; 497 if(OUTPUT_INTERLEAVED && r2!=null){ 498 bb.append(r2.bases).append('\n'); 499 readsWritten++; 500 basesWritten+=r2.length(); 501 } 502 } 503 for(final Read r : job.list){ 504 bb.appendQuality(r.quality).append('\n'); 505 Read r2=r.mate; 506 if(OUTPUT_INTERLEAVED && r2!=null){ 507 bb.appendQuality(r2.quality).append('\n'); 508 } 509 } 510 }else{ 511 for(final Read r1 : job.list){ 512 final Read r2=r1.mate; 513 bb.append(r2.id).append('\n'); 514 } 515 for(final Read r1 : job.list){ 516 final Read r2=r1.mate; 517 bb.append(r2.bases).append('\n'); 518 readsWritten++; 519 basesWritten+=r2.length(); 520 } 521 for(final Read r1 : job.list){ 522 final Read r2=r1.mate; 523 bb.appendQuality(r2.quality).append('\n'); 524 } 525 } 526 527 if(bb.length>=32768){ 528 os.write(bb.array, 0, bb.length); 529 bb.setLength(0); 530 } 531 } 532 533 /** 534 * @param job 535 * @param bb 536 * @param os 537 * @throws IOException 538 */ writeSites(Job job, ByteBuilder bb, OutputStream os)539 private void writeSites(Job job, ByteBuilder bb, OutputStream os) throws IOException { 540 assert(read1); 541 for(final Read r : job.list){ 542 Read r2=(r==null ? null : r.mate); 543 544 if(r!=null && r.sites!=null){ 545 r.toSites(bb).append('\n'); 546 547 readsWritten++; 548 basesWritten+=r.length(); 549 } 550 if(r2!=null){ 551 r2.toSites(bb).append('\n'); 552 553 readsWritten++; 554 basesWritten+=r2.length(); 555 } 556 if(bb.length>=32768){ 557 os.write(bb.array, 0, bb.length); 558 bb.setLength(0); 559 } 560 } 561 } 562 563 /** 564 * @param job 565 * @param bb 566 * @throws IOException 567 */ writeSam(Job job, ByteBuilder bb, OutputStream os)568 private void writeSam(Job job, ByteBuilder bb, OutputStream os) throws IOException { 569 570 assert(read1); 571 for(final Read r : job.list){ 572 Read r2=(r==null ? null : r.mate); 573 574 SamLine sl1=(r==null ? null : (USE_ATTACHED_SAMLINE && r.samline!=null ? r.samline : new SamLine(r, 0))); 575 SamLine sl2=(r2==null ? null : (USE_ATTACHED_SAMLINE && r2.samline!=null ? r2.samline : new SamLine(r2, 1))); 576 577 if(r!=null){ 578 579 if(verbose && r.numSites()>0){ 580 int ssnum=0; 581 final Read clone=r.clone(); 582 for(SiteScore ss : r.sites){ 583 584 clone.setFromSite(ss); 585 clone.setSecondary(true); 586 SamLine sl=new SamLine(clone, 0); 587 588 System.err.println("\n[*** ss"+ssnum+":\n"+ss+"\n*** clone: \n"+clone+"\n*** sl: \n"+sl+"\n***]\n"); 589 ssnum++; 590 } 591 } 592 593 assert(!ASSERT_CIGAR || !r.mapped() || sl1.cigar!=null) : r; 594 sl1.toBytes(bb).append('\n'); 595 596 readsWritten++; 597 basesWritten+=r.length(); 598 ArrayList<SiteScore> list=r.sites; 599 if(OUTPUT_SAM_SECONDARY_ALIGNMENTS && list!=null && list.size()>1){ 600 final Read clone=r.clone(); 601 for(int i=1; i<list.size(); i++){ 602 SiteScore ss=list.get(i); 603 clone.match=null; 604 clone.setFromSite(ss); 605 clone.setSecondary(true); 606 607 // System.err.println(r.numericID+": "+(ss.match==null ? "null" : new String(ss.match))); 608 609 // assert(false) : r.mapScore+"\n"+ss.header()+"\n"+r.sites+"\n"; 610 SamLine sl=new SamLine(clone, 0); 611 assert(!sl.primary()); 612 // sl.setPrimary(false); 613 614 615 assert(!ASSERT_CIGAR || sl.cigar!=null) : r; 616 617 sl.toBytes(bb).append('\n'); 618 619 // readsWritten++; 620 // basesWritten+=r.length(); 621 } 622 } 623 } 624 if(r2!=null){ 625 assert(!ASSERT_CIGAR || !r2.mapped() || sl2.cigar!=null) : r2; 626 if(!SamLine.KEEP_NAMES && sl1!=null && ((sl2.qname==null) || !sl2.qname.equals(sl1.qname))){ 627 sl2.qname=sl1.qname; 628 } 629 sl2.toBytes(bb).append('\n'); 630 631 readsWritten++; 632 basesWritten+=r2.length(); 633 634 ArrayList<SiteScore> list=r2.sites; 635 if(OUTPUT_SAM_SECONDARY_ALIGNMENTS && list!=null && list.size()>1){ 636 final Read clone=r2.clone(); 637 for(int i=1; i<list.size(); i++){ 638 SiteScore ss=list.get(i); 639 clone.match=null; 640 clone.setFromSite(ss); 641 clone.setSecondary(true); 642 // assert(false) : r.mapScore+"\n"+ss.header()+"\n"+r.list+"\n"; 643 SamLine sl=new SamLine(clone, 0); 644 assert(!sl.primary()); 645 // sl.setPrimary(false); 646 647 assert(!ASSERT_CIGAR || sl.cigar!=null) : r2; 648 if(!SamLine.KEEP_NAMES && sl1!=null && ((sl2.qname==null) || !sl2.qname.equals(sl1.qname))){ 649 sl2.qname=sl1.qname; 650 } 651 sl.toBytes(bb).append('\n'); 652 653 // readsWritten++; 654 // basesWritten+=r.length(); 655 } 656 } 657 } 658 if(bb.length>=32768){ 659 os.write(bb.array, 0, bb.length); 660 bb.setLength(0); 661 } 662 663 } 664 } 665 666 /*--------------------------------------------------------------*/ 667 /*---------------- Static Fields ----------------*/ 668 /*--------------------------------------------------------------*/ 669 670 private static final boolean buffered=true; 671 private static final boolean verbose=false; 672 673 } 674