1 package stream; 2 3 import java.util.ArrayList; 4 import java.util.concurrent.ArrayBlockingQueue; 5 6 import shared.KillSwitch; 7 import shared.Shared; 8 import shared.Tools; 9 import structures.ByteBuilder; 10 11 /** 12 * Allows output of reads to multiple different output streams. 13 * Each output stream is controlled by a buffer, 14 * which stores reads until there is a sufficient quantity to dump. 15 * 16 * @author Brian Bushnell 17 * @date May 14, 2019 18 * 19 */ 20 public abstract class BufferedMultiCros extends Thread { 21 22 /*--------------------------------------------------------------*/ 23 /*---------------- Initialization ----------------*/ 24 /*--------------------------------------------------------------*/ 25 26 /** Details in primary constructor */ BufferedMultiCros(String pattern1_, String pattern2_, boolean overwrite_, boolean append_, boolean allowSubprocess_, boolean useSharedHeader_, int defaultFormat_, boolean threaded_)27 public BufferedMultiCros(String pattern1_, String pattern2_, 28 boolean overwrite_, boolean append_, boolean allowSubprocess_, boolean useSharedHeader_, int defaultFormat_, boolean threaded_){ 29 this(pattern1_, pattern2_, overwrite_, append_, allowSubprocess_, useSharedHeader_, defaultFormat_, threaded_, DEFAULT_MAX_STREAMS); 30 } 31 32 /** 33 * Primary constructor. 34 * @param pattern1_ Name pattern for file 1; must contain % (required) 35 * @param pattern2_ Name pattern for file 2; must contain % (optional) 36 * @param overwrite_ Permission to overwrite 37 * @param append_ Permission to append to existing files (this should generally be false) 38 * @param allowSubprocess_ Allow subprocesses such as pigz, bgzip, or samtools 39 * @param useSharedHeader_ Print the stored header (from an input sam file) in all output sam files 40 * @param defaultFormat_ Assume files are in this format if they don't have a valid extension 41 * @param threaded_ Run this mcros in its own thread 42 * @param maxStreams_ Max allowed number of concurrent open streams 43 */ BufferedMultiCros(String pattern1_, String pattern2_, boolean overwrite_, boolean append_, boolean allowSubprocess_, boolean useSharedHeader_, int defaultFormat_, boolean threaded_, int maxStreams_)44 public BufferedMultiCros(String pattern1_, String pattern2_, 45 boolean overwrite_, boolean append_, boolean allowSubprocess_, boolean useSharedHeader_, int defaultFormat_, boolean threaded_, int maxStreams_){ 46 assert(pattern1_!=null && pattern1_.indexOf('%')>=0); 47 assert(pattern2_==null || pattern1_.indexOf('%')>=0); 48 49 //Perform # expansion for twin files 50 if(pattern2_==null && pattern1_.indexOf('#')>=0){ 51 pattern1=pattern1_.replaceFirst("#", "1"); 52 pattern2=pattern1_.replaceFirst("#", "2"); 53 }else{ 54 pattern1=pattern1_; 55 pattern2=pattern2_; 56 } 57 58 overwrite=overwrite_; 59 append=append_; 60 allowSubprocess=allowSubprocess_; 61 useSharedHeader=useSharedHeader_; 62 63 defaultFormat=defaultFormat_; 64 65 threaded=threaded_; 66 transferQueue=threaded ? new ArrayBlockingQueue<ArrayList<Read>>(8) : null; 67 maxStreams=maxStreams_; 68 69 memLimit=Tools.max(10000000, (long)(0.75*Shared.memAvailable())); 70 } 71 72 /*--------------------------------------------------------------*/ 73 /*---------------- Abstract Methods ----------------*/ 74 /*--------------------------------------------------------------*/ 75 76 /** True if no errors were encountered */ finishedSuccessfully()77 public abstract boolean finishedSuccessfully(); 78 79 /** 80 * Add a single read. Should not be used in threaded mode. 81 * @param r Read to add. 82 * @param name Name of destination buffer. 83 */ add(Read r, String name)84 public abstract void add(Read r, String name); 85 86 /** 87 * Dump all buffered reads to disk, except when minReadsToDump forbids it. 88 * @return Number of reads dumped. 89 */ dumpAll()90 abstract long dumpAll(); 91 92 /** 93 * Dump all residual reads to this stream. 94 * @param rosu Destination stream. 95 * @return Number of residual reads dumped. 96 */ dumpResidual(ConcurrentReadOutputStream rosu)97 public abstract long dumpResidual(ConcurrentReadOutputStream rosu); 98 99 /** Dump everything and close any open streams. */ closeInner()100 abstract long closeInner(); 101 102 /** Generate a report on how many reads went to each file */ report()103 public abstract ByteBuilder report(); 104 105 /*--------------------------------------------------------------*/ 106 /*---------------- Final Methods ----------------*/ 107 /*--------------------------------------------------------------*/ 108 109 /** Shut this down and perform any cleanup needed. */ close()110 public final void close(){ 111 if(threaded){poisonAndWait();} 112 else{closeInner();} 113 } 114 115 /** Primary file pattern */ fname()116 public final String fname(){return pattern1;} 117 118 /** Return true if this stream has detected an error */ errorState()119 public final boolean errorState(){ 120 return errorState; 121 } 122 123 /** 124 * Send a list of reads to an output buffer. 125 * The reads must have a name attached to the object field in order to be written. 126 */ add(ArrayList<Read> list)127 public final void add(ArrayList<Read> list) { 128 if(threaded){//Send to the transfer queue 129 try { 130 transferQueue.put(list); 131 } catch (InterruptedException e) { 132 // TODO Auto-generated catch block 133 e.printStackTrace(); 134 KillSwitch.kill(); 135 } 136 }else{//Add the reads from this thread 137 addToBuffers(list); 138 } 139 } 140 141 /** Send individual reads to their designated buffer */ addToBuffers(ArrayList<Read> list)142 private final void addToBuffers(ArrayList<Read> list) { 143 for(Read r : list){ 144 if(r.obj!=null){ 145 String name=(String)r.obj; 146 add(r, name); 147 } 148 } 149 } 150 151 /*--------------------------------------------------------------*/ 152 /*---------------- Threaded Methods ----------------*/ 153 /*--------------------------------------------------------------*/ 154 155 @Override 156 /** For threaded mode */ run()157 public final void run(){ 158 assert(threaded) : "This should only be called in threaded mode."; 159 try { 160 for(ArrayList<Read> list=transferQueue.take(); list!=poisonToken; list=transferQueue.take()){ 161 if(verbose){System.err.println("Got list; size=\"+transferQueue.size())");} 162 addToBuffers(list); 163 if(verbose){System.err.println("Added list; size="+transferQueue.size());} 164 } 165 } catch (InterruptedException e) { 166 e.printStackTrace(); 167 //Terminate JVM if something goes wrong 168 KillSwitch.kill(); 169 } 170 closeInner(); 171 } 172 173 /** Indicate that no more reads will be sent, for threaded mode */ poison()174 public final void poison(){ 175 assert(threaded) : "This should only be called in threaded mode."; 176 transferQueue.add(poisonToken); 177 } 178 179 /** Indicate that no more reads will be sent, for threaded mode */ poisonAndWait()180 public final void poisonAndWait(){ 181 assert(threaded) : "This should only be called in threaded mode."; 182 poison(); 183 waitForFinish(); 184 } 185 186 /** Wait for this object's thread to terminate */ waitForFinish()187 public final void waitForFinish(){ 188 assert(threaded); 189 if(verbose){System.err.println("Waiting for finish.");} 190 while(this.getState()!=Thread.State.TERMINATED){ 191 if(verbose){System.err.println("Attempting join.");} 192 try { 193 this.join(1000); 194 } catch (InterruptedException e) { 195 e.printStackTrace(); 196 } 197 } 198 } 199 200 /*--------------------------------------------------------------*/ 201 /*---------------- Fields ----------------*/ 202 /*--------------------------------------------------------------*/ 203 204 /** Output file patterns containing a % symbol */ 205 public final String pattern1, pattern2; 206 207 /** True if an error was encountered */ 208 boolean errorState=false; 209 210 /** File overwrite permission */ 211 final boolean overwrite; 212 213 /** File append permission */ 214 final boolean append; 215 216 /** Subprocess spawning permission (e.g., for pigz) */ 217 final boolean allowSubprocess; 218 219 /** Output file format, if unclear from file extension */ 220 final int defaultFormat; 221 222 /** Buffers for each ReadStreamWriter */ 223 int rswBuffers=1; 224 225 /** Print the shared header (for sam files) */ 226 final boolean useSharedHeader; 227 228 /** Dump everything if this limit is reached from buffered reads */ 229 long memLimit; 230 231 /** Allow this many active streams, for MCros3 */ 232 public final int maxStreams; 233 234 /** Dump a buffer once it holds this many reads */ 235 public int readsPerBuffer=2000; 236 237 /** Dump a buffer once it holds this many bytes (estimated) */ 238 public int bytesPerBuffer=3000000; 239 240 /** Never write files with fewer than this many reads */ 241 public long minReadsToDump=0; 242 243 /** Number of reads encountered that were not written */ 244 public long residualReads=0, residualBases=0; 245 246 /** Current number of buffered reads */ 247 long readsInFlight=0; 248 249 /** Current number of buffered bytes (estimated) */ 250 long bytesInFlight=0; 251 252 /** Used when MultiCros is run in threaded mode */ 253 private final ArrayBlockingQueue<ArrayList<Read>> transferQueue; 254 255 /** Signal to terminate when in threaded mode */ 256 private final ArrayList<Read> poisonToken=new ArrayList<Read>(0); 257 258 /** True if this object is intended to run in a separate thread */ 259 public final boolean threaded; 260 261 /** Use a LogLog to track cardinality for each output file */ 262 public boolean trackCardinality=false; 263 264 /*--------------------------------------------------------------*/ 265 /*---------------- Static Fields ----------------*/ 266 /*--------------------------------------------------------------*/ 267 268 public static final int DEFAULT_MAX_STREAMS=4; 269 public static boolean verbose=false; 270 271 } 272