1 package stream;
2 
3 import java.io.IOException;
4 import java.io.OutputStream;
5 import java.util.ArrayList;
6 
7 import fileIO.FileFormat;
8 import fileIO.ReadWrite;
9 import structures.ByteBuilder;
10 
11 public class ReadStreamByteWriter extends ReadStreamWriter {
12 
13 	/*--------------------------------------------------------------*/
14 	/*----------------        Initialization        ----------------*/
15 	/*--------------------------------------------------------------*/
16 
ReadStreamByteWriter(FileFormat ff, String qfname_, boolean read1_, int bufferSize, CharSequence header, boolean useSharedHeader)17 	public ReadStreamByteWriter(FileFormat ff, String qfname_, boolean read1_, int bufferSize, CharSequence header, boolean useSharedHeader){
18 		super(ff, qfname_, read1_, bufferSize, header, false, buffered, useSharedHeader);
19 	}
20 
21 	/*--------------------------------------------------------------*/
22 	/*----------------          Execution           ----------------*/
23 	/*--------------------------------------------------------------*/
24 
25 	@Override
run()26 	public void run() {
27 		try {
28 			run2();
29 		} catch (IOException e) {
30 			finishedSuccessfully=false;
31 //			e.printStackTrace();
32 			throw new RuntimeException(e);
33 		}
34 	}
35 
run2()36 	private void run2() throws IOException{
37 		writeHeader();
38 
39 		final ByteBuilder bb=new ByteBuilder(65000);
40 		final ByteBuilder bbq=(myQOutstream==null ? null : new ByteBuilder(65000));
41 
42 		processJobs(bb, bbq);
43 		finishWriting(bb, bbq);
44 	}
45 
46 	/*--------------------------------------------------------------*/
47 	/*----------------        Outer Methods         ----------------*/
48 	/*--------------------------------------------------------------*/
49 
writeHeader()50 	private void writeHeader() throws IOException {
51 		if(!OUTPUT_SAM && !OUTPUT_FASTQ && !OUTPUT_FASTA && !OUTPUT_ATTACHMENT && !OUTPUT_HEADER && !OUTPUT_ONELINE){
52 			if(OUTPUT_FASTR){
53 				myOutstream.write("#FASTR".getBytes());
54 				if(OUTPUT_INTERLEAVED){myOutstream.write("\tINT".getBytes());}
55 				myOutstream.write('\n');
56 			}else{
57 				if(OUTPUT_INTERLEAVED){
58 					//				assert(false) : OUTPUT_SAM+", "+OUTPUT_FASTQ+", "+OUTPUT_FASTA+", "+OUTPUT_ATTACHMENT+", "+OUTPUT_INTERLEAVED+", "+SITES_ONLY;
59 					myOutstream.write("#INTERLEAVED\n".getBytes());
60 				}
61 				if(SITES_ONLY){
62 					myOutstream.write(("#"+SiteScore.header()+"\n").getBytes());
63 				}else if(!OUTPUT_ATTACHMENT){
64 					myOutstream.write(("#"+Read.header()+"\n").getBytes());
65 				}
66 			}
67 		}
68 	}
69 
processJobs(final ByteBuilder bb, final ByteBuilder bbq)70 	private void processJobs(final ByteBuilder bb, final ByteBuilder bbq) throws IOException{
71 
72 		Job job=null;
73 		while(job==null){
74 			try {
75 				job=queue.take();
76 //				job.list=queue.take();
77 			} catch (InterruptedException e) {
78 				// TODO Auto-generated catch block
79 				e.printStackTrace();
80 			}
81 		}
82 
83 		while(job!=null && !job.poison){
84 
85 			final OutputStream os=job.outstream;
86 
87 			if(!job.isEmpty()){
88 				if(myQOutstream!=null){
89 					writeQuality(job, bbq);
90 				}
91 
92 				if(OUTPUT_SAM){
93 					writeSam(job, bb, os);
94 				}else if(SITES_ONLY){
95 					writeSites(job, bb, os);
96 				}else if(OUTPUT_FASTQ){
97 					writeFastq(job, bb, os);
98 				}else if(OUTPUT_FASTA){
99 					writeFasta(job, bb, os);
100 				}else if(OUTPUT_ONELINE){
101 					writeOneline(job, bb, os);
102 				}else if(OUTPUT_ATTACHMENT){
103 					writeAttachment(job, bb, os);
104 				}else if(OUTPUT_HEADER){
105 					writeHeader(job, bb, os);
106 				}else if(OUTPUT_FASTR){
107 					writeFastr(job, bb, os);
108 				}else{
109 					writeBread(job, bb, os);
110 				}
111 			}
112 			if(job.close){
113 				if(bb.length>0){
114 					os.write(bb.array, 0, bb.length);
115 					bb.setLength(0);
116 				}
117 				assert(job.outstream!=null && job.outstream!=myOutstream);
118 				ReadWrite.finishWriting(null, job.outstream, fname, allowSubprocess); //TODO:  This should be job.fname
119 			}
120 
121 			job=null;
122 			while(job==null){
123 				try {
124 					job=queue.take();
125 				} catch (InterruptedException e) {
126 					// TODO Auto-generated catch block
127 					e.printStackTrace();
128 				}
129 			}
130 		}
131 	}
132 
133 	/**
134 	 * @throws IOException
135 	 *
136 	 */
finishWriting(final ByteBuilder bb, final ByteBuilder bbq)137 	private void finishWriting(final ByteBuilder bb, final ByteBuilder bbq) throws IOException {
138 		if(myOutstream!=null){
139 			if(bb.length>0){
140 				myOutstream.write(bb.array, 0, bb.length);
141 				bb.setLength(0);
142 			}
143 			ReadWrite.finishWriting(null, myOutstream, fname, allowSubprocess);
144 		}
145 		if(myQOutstream!=null){
146 			if(bbq.length>0){
147 				myQOutstream.write(bbq.array, 0, bbq.length);
148 				bbq.setLength(0);
149 			}
150 			ReadWrite.finishWriting(null, myQOutstream, qfname, allowSubprocess);
151 		}
152 		finishedSuccessfully=true;
153 	}
154 
155 	/*--------------------------------------------------------------*/
156 	/*----------------        Inner Methods         ----------------*/
157 	/*--------------------------------------------------------------*/
158 
writeQuality(final Job job, final ByteBuilder bbq)159 	private void writeQuality(final Job job, final ByteBuilder bbq) throws IOException{
160 		bbq.setLength(0);
161 		if(read1){
162 			for(final Read r : job.list){
163 				if(r!=null){
164 					{
165 						bbq.append('>');
166 						bbq.append(r.id);
167 						bbq.append('\n');
168 						if(r.bases!=null){toQualityB(r.quality, r.length(), FASTA_WRAP, bbq);}
169 						bbq.append('\n');
170 					}
171 					Read r2=r.mate;
172 					if(OUTPUT_INTERLEAVED && r2!=null){
173 						bbq.append('>');
174 						bbq.append(r2.id);
175 						bbq.append('\n');
176 						if(r2.bases!=null){toQualityB(r2.quality, r2.length(), FASTA_WRAP,  bbq);}
177 						bbq.append('\n');
178 					}
179 				}
180 				if(bbq.length>=32768 || true){
181 					myQOutstream.write(bbq.array, 0, bbq.length);
182 					bbq.setLength(0);
183 				}
184 			}
185 		}else{
186 			for(final Read r1 : job.list){
187 				if(r1!=null){
188 					final Read r2=r1.mate;
189 					assert(r2!=null && r2.mate==r1 && r2!=r1) : r1.toText(false);
190 					bbq.append('>');
191 					bbq.append(r2.id);
192 					bbq.append('\n');
193 					if(r2.bases!=null){toQualityB(r2.quality, r2.length(), FASTA_WRAP,  bbq);}
194 					bbq.append('\n');
195 				}
196 				if(bbq.length>=32768){
197 					myQOutstream.write(bbq.array, 0, bbq.length);
198 					bbq.setLength(0);
199 				}
200 			}
201 		}
202 
203 //		if(bbq.length>0){
204 //			myQOutstream.write(bbq.array, 0, bbq.length);
205 //			bbq.setLength(0);
206 //		}
207 	}
208 
209 	/**
210 	 * @param job
211 	 * @param bb
212 	 * @param os
213 	 * @throws IOException
214 	 */
writeBread(Job job, ByteBuilder bb, OutputStream os)215 	private void writeBread(Job job, ByteBuilder bb, OutputStream os) throws IOException {
216 		if(read1){
217 			for(final Read r : job.list){
218 				if(r!=null){
219 					r.toText(true, bb).append('\n');
220 					readsWritten++;
221 					basesWritten+=r.length();
222 					Read r2=r.mate;
223 					if(OUTPUT_INTERLEAVED && r2!=null){
224 						r2.toText(true, bb).append('\n');
225 						readsWritten++;
226 						basesWritten+=r2.length();
227 					}
228 
229 				}
230 				if(bb.length>=32768){
231 					os.write(bb.array, 0, bb.length);
232 					bb.setLength(0);
233 				}
234 			}
235 		}else{
236 			for(final Read r1 : job.list){
237 				if(r1!=null){
238 					final Read r2=r1.mate;
239 //					assert(r2!=null && r2.mate==r1 && r2!=r1) : r1.toText(false);
240 					if(r2!=null){
241 						r2.toText(true, bb).append('\n');
242 						readsWritten++;
243 						basesWritten+=r2.length();
244 					}else{
245 						//TODO os.print(".\n");
246 					}
247 				}
248 				if(bb.length>=32768){
249 					os.write(bb.array, 0, bb.length);
250 					bb.setLength(0);
251 				}
252 			}
253 		}
254 	}
255 
256 	/**
257 	 * @param job
258 	 * @param bb
259 	 * @param os
260 	 * @throws IOException
261 	 */
writeAttachment(Job job, ByteBuilder bb, OutputStream os)262 	private void writeAttachment(Job job, ByteBuilder bb, OutputStream os) throws IOException {
263 		if(read1){
264 			for(final Read r : job.list){
265 				if(r!=null){
266 					if(r.obj!=null){bb.append(r.obj.toString()).nl();}
267 					else if(r.samline!=null){r.samline.toBytes(bb).nl();}
268 					readsWritten++;
269 					Read r2=r.mate;
270 					if(OUTPUT_INTERLEAVED && r2!=null){
271 						if(r2.obj!=null){bb.append(r2.obj.toString()).nl();}
272 						else if(r2.samline!=null){r2.samline.toBytes(bb).nl();}
273 						readsWritten++;
274 					}
275 				}
276 				if(bb.length>=32768){
277 					os.write(bb.array, 0, bb.length);
278 					bb.setLength(0);
279 				}
280 			}
281 		}else{
282 			for(final Read r1 : job.list){
283 				if(r1!=null){
284 					final Read r2=r1.mate;
285 					if(r2!=null){
286 						if(r2.obj!=null){bb.append(r2.obj.toString()).nl();}
287 						else if(r2.samline!=null){r2.samline.toBytes(bb).nl();}
288 						readsWritten++;
289 					}else{
290 //						bb.append('.').append('\n');
291 					}
292 				}
293 				if(bb.length>=32768){
294 					os.write(bb.array, 0, bb.length);
295 					bb.setLength(0);
296 				}
297 			}
298 		}
299 	}
300 
301 	/**
302 	 * @param job
303 	 * @param bb
304 	 * @param os
305 	 * @throws IOException
306 	 */
writeHeader(Job job, ByteBuilder bb, OutputStream os)307 	private void writeHeader(Job job, ByteBuilder bb, OutputStream os) throws IOException {
308 		if(read1){
309 			for(final Read r : job.list){
310 				if(r!=null){
311 					bb.append(r.id).append('\n');
312 					readsWritten++;
313 					Read r2=r.mate;
314 					if(OUTPUT_INTERLEAVED && r2!=null){
315 						bb.append(r2.id).append('\n');
316 						readsWritten++;
317 					}
318 				}
319 				if(bb.length>=32768){
320 					os.write(bb.array, 0, bb.length);
321 					bb.setLength(0);
322 				}
323 			}
324 		}else{
325 			for(final Read r1 : job.list){
326 				if(r1!=null){
327 					final Read r2=r1.mate;
328 					if(r2!=null){
329 						bb.append(r2.id).append('\n');
330 						readsWritten++;
331 					}else{
332 //						bb.append('.').append('\n');
333 					}
334 				}
335 				if(bb.length>=32768){
336 					os.write(bb.array, 0, bb.length);
337 					bb.setLength(0);
338 				}
339 			}
340 		}
341 	}
342 
343 	/**
344 	 * @param job
345 	 * @param bb
346 	 * @param os
347 	 * @throws IOException
348 	 */
writeFasta(Job job, ByteBuilder bb, OutputStream os)349 	private void writeFasta(Job job, ByteBuilder bb, OutputStream os) throws IOException {
350 		if(read1){
351 			for(final Read r : job.list){
352 				if(r!=null){
353 					r.toFasta(FASTA_WRAP, bb).append('\n');
354 					readsWritten++;
355 					basesWritten+=r.length();
356 					Read r2=r.mate;
357 					if(OUTPUT_INTERLEAVED && r2!=null){
358 						r2.toFasta(FASTA_WRAP, bb).append('\n');
359 						readsWritten++;
360 						basesWritten+=r2.length();
361 					}
362 				}
363 				if(bb.length>=32768){
364 					os.write(bb.array, 0, bb.length);
365 					bb.setLength(0);
366 				}
367 			}
368 		}else{
369 			for(final Read r1 : job.list){
370 				if(r1!=null){
371 					final Read r2=r1.mate;
372 					assert(ignorePairAssertions || (r2!=null && r2.mate==r1 && r2!=r1)) : "\n"+r1.toText(false)+"\n\n"+(r2==null ? "null" : r2.toText(false)+"\n");
373 					if(r2!=null){
374 						r2.toFasta(FASTA_WRAP, bb).append('\n');
375 						readsWritten++;
376 						basesWritten+=r2.length();
377 					}
378 				}
379 				if(bb.length>=32768){
380 					os.write(bb.array, 0, bb.length);
381 					bb.setLength(0);
382 				}
383 			}
384 		}
385 	}
386 
387 	/**
388 	 * @param job
389 	 * @param bb
390 	 * @param os
391 	 * @throws IOException
392 	 */
writeOneline(Job job, ByteBuilder bb, OutputStream os)393 	private void writeOneline(Job job, ByteBuilder bb, OutputStream os) throws IOException {
394 		if(read1){
395 			for(final Read r : job.list){
396 				if(r!=null){
397 					bb.append(r.id).append('\t').append(r.bases).append('\n');
398 					readsWritten++;
399 					basesWritten+=r.length();
400 					Read r2=r.mate;
401 					if(OUTPUT_INTERLEAVED && r2!=null){
402 						bb.append(r2.id).append('\t').append(r2.bases).append('\n');
403 						readsWritten++;
404 						basesWritten+=r2.length();
405 					}
406 				}
407 				if(bb.length>=32768){
408 					os.write(bb.array, 0, bb.length);
409 					bb.setLength(0);
410 				}
411 			}
412 		}else{
413 			for(final Read r1 : job.list){
414 				if(r1!=null){
415 					final Read r2=r1.mate;
416 					assert(ignorePairAssertions || (r2!=null && r2.mate==r1 && r2!=r1)) : "\n"+r1.toText(false)+"\n\n"+(r2==null ? "null" : r2.toText(false)+"\n");
417 					if(r2!=null){
418 						bb.append(r2.id).append('\t').append(r2.bases).append('\n');
419 						readsWritten++;
420 						basesWritten+=r2.length();
421 					}
422 				}
423 				if(bb.length>=32768){
424 					os.write(bb.array, 0, bb.length);
425 					bb.setLength(0);
426 				}
427 			}
428 		}
429 	}
430 
431 	/**
432 	 * @param job
433 	 * @param bb
434 	 * @param os
435 	 * @throws IOException
436 	 */
writeFastq(Job job, ByteBuilder bb, OutputStream os)437 	private void writeFastq(Job job, ByteBuilder bb, OutputStream os) throws IOException {
438 		if(read1){
439 			for(final Read r : job.list){
440 				if(r!=null){
441 					r.toFastq(bb).append('\n');
442 					readsWritten++;
443 					basesWritten+=r.length();
444 					Read r2=r.mate;
445 					if(OUTPUT_INTERLEAVED && r2!=null){
446 						r2.toFastq(bb).append('\n');
447 						readsWritten++;
448 						basesWritten+=r2.length();
449 					}
450 				}
451 				if(bb.length>=32768){
452 					os.write(bb.array, 0, bb.length);
453 					bb.setLength(0);
454 				}
455 			}
456 		}else{
457 			for(final Read r1 : job.list){
458 				if(r1!=null){
459 					final Read r2=r1.mate;
460 					assert(ignorePairAssertions || (r2!=null && r2.mate==r1 && r2!=r1)) : "\n"+r1.toText(false)+"\n\n"+(r2==null ? "null" : r2.toText(false)+"\n");
461 					if(r2!=null){
462 						r2.toFastq(bb).append('\n');
463 						readsWritten++;
464 						basesWritten+=r2.length();
465 					}
466 				}
467 				if(bb.length>=32768){
468 					os.write(bb.array, 0, bb.length);
469 					bb.setLength(0);
470 				}
471 			}
472 		}
473 	}
474 
475 	/**
476 	 * @param job
477 	 * @param bb
478 	 * @param os
479 	 * @throws IOException
480 	 */
writeFastr(Job job, ByteBuilder bb, OutputStream os)481 	private void writeFastr(Job job, ByteBuilder bb, OutputStream os) throws IOException {
482 		bb.append(job.list.size()).append('\n');
483 		if(read1){
484 			for(final Read r : job.list){
485 				bb.append(r.id).append('\n');
486 				Read r2=r.mate;
487 				if(OUTPUT_INTERLEAVED && r2!=null){
488 					bb.append(r2.id).append('\n');
489 				}
490 			}
491 			for(final Read r : job.list){
492 				bb.append(r.bases).append('\n');
493 				readsWritten++;
494 				basesWritten+=r.length();
495 
496 				Read r2=r.mate;
497 				if(OUTPUT_INTERLEAVED && r2!=null){
498 					bb.append(r2.bases).append('\n');
499 					readsWritten++;
500 					basesWritten+=r2.length();
501 				}
502 			}
503 			for(final Read r : job.list){
504 				bb.appendQuality(r.quality).append('\n');
505 				Read r2=r.mate;
506 				if(OUTPUT_INTERLEAVED && r2!=null){
507 					bb.appendQuality(r2.quality).append('\n');
508 				}
509 			}
510 		}else{
511 			for(final Read r1 : job.list){
512 				final Read r2=r1.mate;
513 				bb.append(r2.id).append('\n');
514 			}
515 			for(final Read r1 : job.list){
516 				final Read r2=r1.mate;
517 				bb.append(r2.bases).append('\n');
518 				readsWritten++;
519 				basesWritten+=r2.length();
520 			}
521 			for(final Read r1 : job.list){
522 				final Read r2=r1.mate;
523 				bb.appendQuality(r2.quality).append('\n');
524 			}
525 		}
526 
527 		if(bb.length>=32768){
528 			os.write(bb.array, 0, bb.length);
529 			bb.setLength(0);
530 		}
531 	}
532 
533 	/**
534 	 * @param job
535 	 * @param bb
536 	 * @param os
537 	 * @throws IOException
538 	 */
writeSites(Job job, ByteBuilder bb, OutputStream os)539 	private void writeSites(Job job, ByteBuilder bb, OutputStream os) throws IOException {
540 		assert(read1);
541 		for(final Read r : job.list){
542 			Read r2=(r==null ? null : r.mate);
543 
544 			if(r!=null && r.sites!=null){
545 				r.toSites(bb).append('\n');
546 
547 				readsWritten++;
548 				basesWritten+=r.length();
549 			}
550 			if(r2!=null){
551 				r2.toSites(bb).append('\n');
552 
553 				readsWritten++;
554 				basesWritten+=r2.length();
555 			}
556 			if(bb.length>=32768){
557 				os.write(bb.array, 0, bb.length);
558 				bb.setLength(0);
559 			}
560 		}
561 	}
562 
563 	/**
564 	 * @param job
565 	 * @param bb
566 	 * @throws IOException
567 	 */
writeSam(Job job, ByteBuilder bb, OutputStream os)568 	private void writeSam(Job job, ByteBuilder bb, OutputStream os) throws IOException {
569 
570 		assert(read1);
571 		for(final Read r : job.list){
572 			Read r2=(r==null ? null : r.mate);
573 
574 			SamLine sl1=(r==null ? null : (USE_ATTACHED_SAMLINE && r.samline!=null ? r.samline : new SamLine(r, 0)));
575 			SamLine sl2=(r2==null ? null : (USE_ATTACHED_SAMLINE && r2.samline!=null ? r2.samline : new SamLine(r2, 1)));
576 
577 			if(r!=null){
578 
579 				if(verbose && r.numSites()>0){
580 					int ssnum=0;
581 					final Read clone=r.clone();
582 					for(SiteScore ss : r.sites){
583 
584 						clone.setFromSite(ss);
585 						clone.setSecondary(true);
586 						SamLine sl=new SamLine(clone, 0);
587 
588 						System.err.println("\n[*** ss"+ssnum+":\n"+ss+"\n*** clone: \n"+clone+"\n*** sl: \n"+sl+"\n***]\n");
589 						ssnum++;
590 					}
591 				}
592 
593 				assert(!ASSERT_CIGAR || !r.mapped() || sl1.cigar!=null) : r;
594 				sl1.toBytes(bb).append('\n');
595 
596 				readsWritten++;
597 				basesWritten+=r.length();
598 				ArrayList<SiteScore> list=r.sites;
599 				if(OUTPUT_SAM_SECONDARY_ALIGNMENTS && list!=null && list.size()>1){
600 					final Read clone=r.clone();
601 					for(int i=1; i<list.size(); i++){
602 						SiteScore ss=list.get(i);
603 						clone.match=null;
604 						clone.setFromSite(ss);
605 						clone.setSecondary(true);
606 
607 //						System.err.println(r.numericID+": "+(ss.match==null ? "null" : new String(ss.match)));
608 
609 //						assert(false) : r.mapScore+"\n"+ss.header()+"\n"+r.sites+"\n";
610 						SamLine sl=new SamLine(clone, 0);
611 						assert(!sl.primary());
612 //						sl.setPrimary(false);
613 
614 
615 						assert(!ASSERT_CIGAR || sl.cigar!=null) : r;
616 
617 						sl.toBytes(bb).append('\n');
618 
619 //						readsWritten++;
620 //						basesWritten+=r.length();
621 					}
622 				}
623 			}
624 			if(r2!=null){
625 				assert(!ASSERT_CIGAR || !r2.mapped() || sl2.cigar!=null) : r2;
626 				if(!SamLine.KEEP_NAMES && sl1!=null && ((sl2.qname==null) || !sl2.qname.equals(sl1.qname))){
627 					sl2.qname=sl1.qname;
628 				}
629 				sl2.toBytes(bb).append('\n');
630 
631 				readsWritten++;
632 				basesWritten+=r2.length();
633 
634 				ArrayList<SiteScore> list=r2.sites;
635 				if(OUTPUT_SAM_SECONDARY_ALIGNMENTS && list!=null && list.size()>1){
636 					final Read clone=r2.clone();
637 					for(int i=1; i<list.size(); i++){
638 						SiteScore ss=list.get(i);
639 						clone.match=null;
640 						clone.setFromSite(ss);
641 						clone.setSecondary(true);
642 //						assert(false) : r.mapScore+"\n"+ss.header()+"\n"+r.list+"\n";
643 						SamLine sl=new SamLine(clone, 0);
644 						assert(!sl.primary());
645 //						sl.setPrimary(false);
646 
647 						assert(!ASSERT_CIGAR || sl.cigar!=null) : r2;
648 						if(!SamLine.KEEP_NAMES && sl1!=null && ((sl2.qname==null) || !sl2.qname.equals(sl1.qname))){
649 							sl2.qname=sl1.qname;
650 						}
651 						sl.toBytes(bb).append('\n');
652 
653 //						readsWritten++;
654 //						basesWritten+=r.length();
655 					}
656 				}
657 			}
658 			if(bb.length>=32768){
659 				os.write(bb.array, 0, bb.length);
660 				bb.setLength(0);
661 			}
662 
663 		}
664 	}
665 
666 	/*--------------------------------------------------------------*/
667 	/*----------------        Static Fields         ----------------*/
668 	/*--------------------------------------------------------------*/
669 
670 	private static final boolean buffered=true;
671 	private static final boolean verbose=false;
672 
673 }
674