1 package stream; 2 3 import java.util.ArrayList; 4 5 import fileIO.FileFormat; 6 import fileIO.ReadWrite; 7 import shared.Tools; 8 import structures.ListNum; 9 10 /** 11 * @author Brian Bushnell 12 * @date Apr 3, 2015 13 * 14 */ 15 public class DualCris extends ConcurrentReadInputStream { 16 main(String[] args)17 public static void main(String[] args){ 18 String a=args[0]; 19 String b=args.length>1 ? args[1] : null; 20 FileFormat ff1=FileFormat.testInput(a, null, false); 21 FileFormat ff2=(b==null ? null : FileFormat.testInput(b, null, false)); 22 DualCris cris=getReadInputStream(-1, false, ff1, ff2, null, null); 23 cris.start(); 24 25 ListNum<Read> ln=cris.nextList(); 26 ArrayList<Read> reads=ln.list; 27 28 boolean foundR1=false, foundR2=false; 29 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning 30 for(Read r1 : reads){ 31 Read r2=r1.mate; 32 if(r1.pairnum()==0){foundR1=true;} 33 else{foundR2=true;} 34 if(r2!=null){ 35 if(r2.pairnum()==0){foundR1=true;} 36 else{foundR2=true;} 37 } 38 } 39 40 System.err.print(ln.id); 41 42 cris.returnList(ln.id, foundR1, foundR2); 43 foundR1=foundR2=false; 44 ln=cris.nextList(); 45 reads=(ln!=null ? ln.list : null); 46 System.err.print(","); 47 } 48 System.err.print("Finished."); 49 cris.returnList(ln.id, foundR1, foundR2); 50 ReadWrite.closeStreams(cris); 51 } 52 getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2, String qf1, String qf2)53 public static DualCris getReadInputStream(long maxReads, boolean keepSamHeader, 54 FileFormat ff1, FileFormat ff2, String qf1, String qf2){ 55 ConcurrentReadInputStream cris1=(ff1==null ? null : ConcurrentReadInputStream.getReadInputStream(maxReads, keepSamHeader, ff1, null, qf1, null)); 56 ConcurrentReadInputStream cris2=(ff2==null ? null : ConcurrentReadInputStream.getReadInputStream(maxReads, keepSamHeader, ff2, null, qf2, null)); 57 return new DualCris(cris1, cris2); 58 } 59 DualCris(ConcurrentReadInputStream cris1_, ConcurrentReadInputStream cris2_)60 public DualCris(ConcurrentReadInputStream cris1_, ConcurrentReadInputStream cris2_){ 61 cris1=cris1_; 62 cris2=cris2_; 63 } 64 65 private final ConcurrentReadInputStream cris1; 66 private final ConcurrentReadInputStream cris2; 67 private boolean cris1Active, cris2Active; 68 private boolean errorState=false; 69 private boolean verbose=false; 70 71 @Override nextList()72 public ListNum<Read> nextList() { 73 74 ListNum<Read> ln1=null, ln2=null; 75 if(cris1Active && cris1!=null){ 76 ln1=cris1.nextList(); 77 if(ln1==null){ 78 synchronized(this){ 79 cris1Active=false; 80 System.err.println("\nSet cris1Active="+cris1Active); 81 } 82 } 83 } 84 if(cris2Active && cris2!=null){ 85 ln2=cris2.nextList(); 86 if(ln2!=null){ 87 for(Read r : ln2.list){r.setPairnum(1);} 88 }else{ 89 synchronized(this){ 90 cris2Active=false; 91 System.err.println("\nSet cris2Active="+cris2Active); 92 } 93 } 94 } 95 96 if(ln1!=null && ln2!=null){ 97 final int size1=ln1.size(), size2=ln2.size(); 98 final int min=Tools.min(size1, size2); 99 for(int i=0; i<min; i++){ 100 Read r1=ln1.get(i); 101 Read r2=ln2.get(i); 102 r1.mate=r2; 103 r2.mate=r1; 104 } 105 if(size2>size1){ 106 for(int i=size1; i<size2; i++){ 107 ln1.add(ln2.get(i)); 108 } 109 } 110 }else if(ln2!=null){ 111 ln1=ln2; 112 } 113 114 return ln1; 115 } 116 117 @Override returnList(long listNum, boolean poison)118 public void returnList(long listNum, boolean poison) { 119 throw new RuntimeException("Unsupported."); 120 } 121 returnList(long listNum, boolean foundR1, boolean foundR2)122 public void returnList(long listNum, boolean foundR1, boolean foundR2) { 123 if(cris1!=null && cris1Active){ 124 cris1.returnList(listNum, !foundR1); 125 if(!foundR1){cris1Active=false;} 126 } 127 if(cris2!=null && cris2Active){ 128 cris2.returnList(listNum, !foundR2); 129 if(!foundR2){cris2Active=false;} 130 } 131 } 132 133 @Override start()134 public void start() { 135 started=true; 136 if(cris1!=null){ 137 cris1.start(); 138 cris1Active=true; 139 } 140 if(cris2!=null){ 141 cris2.start(); 142 cris2Active=true; 143 } 144 } 145 146 @Override run()147 public void run() {assert(false);} 148 149 @Override shutdown()150 public void shutdown() { 151 if(cris1!=null){cris1.shutdown();} 152 if(cris2!=null){cris2.shutdown();} 153 cris1Active=cris2Active=false; 154 } 155 156 @Override restart()157 public void restart() { 158 if(cris1!=null){ 159 cris1.restart(); 160 cris1Active=true; 161 } 162 if(cris2!=null){ 163 cris2.restart(); 164 cris2Active=true; 165 } 166 } 167 168 @Override close()169 public void close() { 170 if(cris1!=null){cris1.close();} 171 if(cris2!=null){cris2.close();} 172 cris1Active=cris2Active=false; 173 } 174 175 @Override paired()176 public boolean paired() { 177 assert(cris1!=null || cris2!=null); 178 if(cris2!=null){return true;} 179 if(cris1!=null){return cris1.paired();} 180 return false; 181 } 182 183 @Override producers()184 public Object[] producers() { 185 ArrayList<Object> list=new ArrayList<Object>(); 186 if(cris1!=null){ 187 for(Object o : cris1.producers()){list.add(o);} 188 } 189 if(cris2!=null){ 190 for(Object o : cris2.producers()){list.add(o);} 191 } 192 return list.toArray(); 193 } 194 195 @Override errorState()196 public boolean errorState() { 197 if(cris1!=null){errorState|=cris1.errorState();} 198 if(cris2!=null){errorState|=cris2.errorState();} 199 return errorState; 200 } 201 202 @Override setSampleRate(float rate, long seed)203 public void setSampleRate(float rate, long seed) { 204 throw new RuntimeException("Invalid."); 205 } 206 207 @Override basesIn()208 public long basesIn() { 209 return (cris1==null ? 0 : cris1.basesIn())+(cris2==null ? 0 : cris2.basesIn()); 210 } 211 212 @Override readsIn()213 public long readsIn() { 214 return (cris1==null ? 0 : cris1.readsIn())+(cris2==null ? 0 : cris2.readsIn()); 215 } 216 217 @Override verbose()218 public boolean verbose() { 219 return verbose; 220 } 221 222 } 223