1 package stream;
2 
3 import java.util.ArrayList;
4 import java.util.concurrent.ArrayBlockingQueue;
5 
6 import shared.KillSwitch;
7 import shared.Shared;
8 import shared.Tools;
9 import structures.ByteBuilder;
10 
11 /**
12  * Allows output of reads to multiple different output streams.
13  * Each output stream is controlled by a buffer,
14  * which stores reads until there is a sufficient quantity to dump.
15  *
16  * @author Brian Bushnell
17  * @date May 14, 2019
18  *
19  */
20 public abstract class BufferedMultiCros extends Thread {
21 
22 	/*--------------------------------------------------------------*/
23 	/*----------------        Initialization        ----------------*/
24 	/*--------------------------------------------------------------*/
25 
26 	/** Details in primary constructor */
BufferedMultiCros(String pattern1_, String pattern2_, boolean overwrite_, boolean append_, boolean allowSubprocess_, boolean useSharedHeader_, int defaultFormat_, boolean threaded_)27 	public BufferedMultiCros(String pattern1_, String pattern2_,
28 			boolean overwrite_, boolean append_, boolean allowSubprocess_, boolean useSharedHeader_, int defaultFormat_, boolean threaded_){
29 		this(pattern1_, pattern2_, overwrite_, append_, allowSubprocess_, useSharedHeader_, defaultFormat_, threaded_, DEFAULT_MAX_STREAMS);
30 	}
31 
32 	/**
33 	 * Primary constructor.
34 	 * @param pattern1_ Name pattern for file 1; must contain % (required)
35 	 * @param pattern2_ Name pattern for file 2; must contain % (optional)
36 	 * @param overwrite_ Permission to overwrite
37 	 * @param append_ Permission to append to existing files (this should generally be false)
38 	 * @param allowSubprocess_ Allow subprocesses such as pigz, bgzip, or samtools
39 	 * @param useSharedHeader_ Print the stored header (from an input sam file) in all output sam files
40 	 * @param defaultFormat_ Assume files are in this format if they don't have a valid extension
41 	 * @param threaded_ Run this mcros in its own thread
42 	 * @param maxStreams_ Max allowed number of concurrent open streams
43 	 */
BufferedMultiCros(String pattern1_, String pattern2_, boolean overwrite_, boolean append_, boolean allowSubprocess_, boolean useSharedHeader_, int defaultFormat_, boolean threaded_, int maxStreams_)44 	public BufferedMultiCros(String pattern1_, String pattern2_,
45 			boolean overwrite_, boolean append_, boolean allowSubprocess_, boolean useSharedHeader_, int defaultFormat_, boolean threaded_, int maxStreams_){
46 		assert(pattern1_!=null && pattern1_.indexOf('%')>=0);
47 		assert(pattern2_==null || pattern1_.indexOf('%')>=0);
48 
49 		//Perform # expansion for twin files
50 		if(pattern2_==null && pattern1_.indexOf('#')>=0){
51 			pattern1=pattern1_.replaceFirst("#", "1");
52 			pattern2=pattern1_.replaceFirst("#", "2");
53 		}else{
54 			pattern1=pattern1_;
55 			pattern2=pattern2_;
56 		}
57 
58 		overwrite=overwrite_;
59 		append=append_;
60 		allowSubprocess=allowSubprocess_;
61 		useSharedHeader=useSharedHeader_;
62 
63 		defaultFormat=defaultFormat_;
64 
65 		threaded=threaded_;
66 		transferQueue=threaded ? new ArrayBlockingQueue<ArrayList<Read>>(8) : null;
67 		maxStreams=maxStreams_;
68 
69 		memLimit=Tools.max(10000000, (long)(0.75*Shared.memAvailable()));
70 	}
71 
72 	/*--------------------------------------------------------------*/
73 	/*----------------       Abstract Methods       ----------------*/
74 	/*--------------------------------------------------------------*/
75 
76 	/** True if no errors were encountered */
finishedSuccessfully()77 	public abstract boolean finishedSuccessfully();
78 
79 	/**
80 	 * Add a single read.  Should not be used in threaded mode.
81 	 * @param r Read to add.
82 	 * @param name Name of destination buffer.
83 	 */
add(Read r, String name)84 	public abstract void add(Read r, String name);
85 
86 	/**
87 	 * Dump all buffered reads to disk, except when minReadsToDump forbids it.
88 	 * @return Number of reads dumped.
89 	 */
dumpAll()90 	abstract long dumpAll();
91 
92 	/**
93 	 * Dump all residual reads to this stream.
94 	 * @param rosu Destination stream.
95 	 * @return Number of residual reads dumped.
96 	 */
dumpResidual(ConcurrentReadOutputStream rosu)97 	public abstract long dumpResidual(ConcurrentReadOutputStream rosu);
98 
99 	/** Dump everything and close any open streams. */
closeInner()100 	abstract long closeInner();
101 
102 	/** Generate a report on how many reads went to each file */
report()103 	public abstract ByteBuilder report();
104 
105 	/*--------------------------------------------------------------*/
106 	/*----------------        Final Methods         ----------------*/
107 	/*--------------------------------------------------------------*/
108 
109 	/** Shut this down and perform any cleanup needed. */
close()110 	public final void close(){
111 		if(threaded){poisonAndWait();}
112 		else{closeInner();}
113 	}
114 
115 	/** Primary file pattern */
fname()116 	public final String fname(){return pattern1;}
117 
118 	/** Return true if this stream has detected an error */
errorState()119 	public final boolean errorState(){
120 		return errorState;
121 	}
122 
123 	/**
124 	 * Send a list of reads to an output buffer.
125 	 * The reads must have a name attached to the object field in order to be written.
126 	 */
add(ArrayList<Read> list)127 	public final void add(ArrayList<Read> list) {
128 		if(threaded){//Send to the transfer queue
129 			try {
130 				transferQueue.put(list);
131 			} catch (InterruptedException e) {
132 				// TODO Auto-generated catch block
133 				e.printStackTrace();
134 				KillSwitch.kill();
135 			}
136 		}else{//Add the reads from this thread
137 			addToBuffers(list);
138 		}
139 	}
140 
141 	/** Send individual reads to their designated buffer */
addToBuffers(ArrayList<Read> list)142 	private final void addToBuffers(ArrayList<Read> list) {
143 		for(Read r : list){
144 			if(r.obj!=null){
145 				String name=(String)r.obj;
146 				add(r, name);
147 			}
148 		}
149 	}
150 
151 	/*--------------------------------------------------------------*/
152 	/*----------------       Threaded Methods       ----------------*/
153 	/*--------------------------------------------------------------*/
154 
155 	@Override
156 	/** For threaded mode */
run()157 	public final void run(){
158 		assert(threaded) : "This should only be called in threaded mode.";
159 		try {
160 			for(ArrayList<Read> list=transferQueue.take(); list!=poisonToken; list=transferQueue.take()){
161 				if(verbose){System.err.println("Got list; size=\"+transferQueue.size())");}
162 				addToBuffers(list);
163 				if(verbose){System.err.println("Added list; size="+transferQueue.size());}
164 			}
165 		} catch (InterruptedException e) {
166 			e.printStackTrace();
167 			//Terminate JVM if something goes wrong
168 			KillSwitch.kill();
169 		}
170 		closeInner();
171 	}
172 
173 	/** Indicate that no more reads will be sent, for threaded mode */
poison()174 	public final void poison(){
175 		assert(threaded) : "This should only be called in threaded mode.";
176 		transferQueue.add(poisonToken);
177 	}
178 
179 	/** Indicate that no more reads will be sent, for threaded mode */
poisonAndWait()180 	public final void poisonAndWait(){
181 		assert(threaded) : "This should only be called in threaded mode.";
182 		poison();
183 		waitForFinish();
184 	}
185 
186 	/** Wait for this object's thread to terminate */
waitForFinish()187 	public final void waitForFinish(){
188 		assert(threaded);
189 		if(verbose){System.err.println("Waiting for finish.");}
190 		while(this.getState()!=Thread.State.TERMINATED){
191 			if(verbose){System.err.println("Attempting join.");}
192 			try {
193 				this.join(1000);
194 			} catch (InterruptedException e) {
195 				e.printStackTrace();
196 			}
197 		}
198 	}
199 
200 	/*--------------------------------------------------------------*/
201 	/*----------------             Fields           ----------------*/
202 	/*--------------------------------------------------------------*/
203 
204 	/** Output file patterns containing a % symbol */
205 	public final String pattern1, pattern2;
206 
207 	/** True if an error was encountered */
208 	boolean errorState=false;
209 
210 	/** File overwrite permission */
211 	final boolean overwrite;
212 
213 	/** File append permission */
214 	final boolean append;
215 
216 	/** Subprocess spawning permission (e.g., for pigz) */
217 	final boolean allowSubprocess;
218 
219 	/** Output file format, if unclear from file extension */
220 	final int defaultFormat;
221 
222 	/** Buffers for each ReadStreamWriter */
223 	int rswBuffers=1;
224 
225 	/** Print the shared header (for sam files) */
226 	final boolean useSharedHeader;
227 
228 	/** Dump everything if this limit is reached from buffered reads */
229 	long memLimit;
230 
231 	/** Allow this many active streams, for MCros3 */
232 	public final int maxStreams;
233 
234 	/** Dump a buffer once it holds this many reads */
235 	public int readsPerBuffer=2000;
236 
237 	/** Dump a buffer once it holds this many bytes (estimated) */
238 	public int bytesPerBuffer=3000000;
239 
240 	/** Never write files with fewer than this many reads */
241 	public long minReadsToDump=0;
242 
243 	/** Number of reads encountered that were not written */
244 	public long residualReads=0, residualBases=0;
245 
246 	/** Current number of buffered reads */
247 	long readsInFlight=0;
248 
249 	/** Current number of buffered bytes (estimated) */
250 	long bytesInFlight=0;
251 
252 	/** Used when MultiCros is run in threaded mode */
253 	private final ArrayBlockingQueue<ArrayList<Read>> transferQueue;
254 
255 	/** Signal to terminate when in threaded mode */
256 	private final ArrayList<Read> poisonToken=new ArrayList<Read>(0);
257 
258 	/** True if this object is intended to run in a separate thread */
259 	public final boolean threaded;
260 
261 	/** Use a LogLog to track cardinality for each output file */
262 	public boolean trackCardinality=false;
263 
264 	/*--------------------------------------------------------------*/
265 	/*----------------        Static Fields         ----------------*/
266 	/*--------------------------------------------------------------*/
267 
268 	public static final int DEFAULT_MAX_STREAMS=4;
269 	public static boolean verbose=false;
270 
271 }
272