1 package fileIO;
2 import java.io.InputStream;
3 import java.util.Arrays;
4 import java.util.concurrent.ArrayBlockingQueue;
5 
6 import shared.Timer;
7 import shared.Tools;
8 
9 
10 /**
11  * Runs a ByteFile1 in a separate thread.  Can speed up disk reading, particularly of compressed files, at cost of slightly more work done.
12  * Drop-in compatible with ByteFile1.
13  * @author Brian Bushnell
14  * @date Sep 23, 2013
15  *
16  */
17 public final class ByteFile2 extends ByteFile {
18 
19 
main(String[] args)20 	public static void main(String[] args){
21 		ByteFile2 tf=new ByteFile2(args.length>0 ? args[0] : "stdin", true);
22 		long first=0, last=100;
23 		boolean speedtest=false;
24 		if(args.length>1){
25 			if(args[1].equalsIgnoreCase("speedtest")){
26 				speedtest=true;
27 				first=0;
28 				last=Long.MAX_VALUE;
29 			}else{
30 				first=Integer.parseInt(args[1]);
31 				last=first+100;
32 			}
33 		}
34 		if(args.length>2){
35 			last=Integer.parseInt(args[2]);
36 		}
37 		speedtest(tf, first, last, !speedtest);
38 
39 		tf.close();
40 		tf.reset();
41 		tf.close();
42 	}
43 
speedtest(ByteFile2 tf, long first, long last, boolean reprint)44 	private static void speedtest(ByteFile2 tf, long first, long last, boolean reprint){
45 		Timer t=new Timer();
46 		long lines=0;
47 		long bytes=0;
48 		for(long i=0; i<first; i++){tf.nextLine();}
49 		if(reprint){
50 			for(long i=first; i<last; i++){
51 				byte[] s=tf.nextLine();
52 				if(s==null){break;}
53 
54 				lines++;
55 				bytes+=s.length;
56 				System.out.println(new String(s));
57 			}
58 
59 			System.err.println("\n");
60 			System.err.println("Lines: "+lines);
61 			System.err.println("Bytes: "+bytes);
62 		}else{
63 			for(long i=first; i<last; i++){
64 				byte[] s=tf.nextLine();
65 				if(s==null){break;}
66 				lines++;
67 				bytes+=s.length;
68 			}
69 		}
70 		t.stop();
71 
72 		if(!reprint){
73 			System.err.println(Tools.timeLinesBytesProcessed(t, lines, bytes, 8));
74 		}
75 	}
76 
77 //	public ByteFile2(String name()){this(name(), false);}
78 
ByteFile2(String fname, boolean allowSubprocess_)79 	public ByteFile2(String fname, boolean allowSubprocess_){
80 		this(FileFormat.testInput(fname, FileFormat.TEXT, null, allowSubprocess_, false));
81 	}
82 
ByteFile2(FileFormat ff)83 	public ByteFile2(FileFormat ff){
84 		super(ff);
85 		if(verbose){System.err.println("ByteFile2("+ff+")");}
86 		open();
87 	}
88 
89 	@Override
reset()90 	public final void reset(){
91 		close();
92 		open();
93 		superReset();
94 	}
95 
96 	@Override
close()97 	public synchronized final boolean close(){
98 		if(verbose){System.err.println("ByteFile2("+name()+").close()");}
99 		if(isOpen()){
100 //			errorState|=ReadWrite.killProcess(name());
101 			thread.shutdown();
102 			while(thread.getState()!=Thread.State.TERMINATED){
103 				try {
104 					thread.join();
105 				} catch (InterruptedException e) {
106 					// TODO Auto-generated catch block
107 					e.printStackTrace();
108 				}
109 			}
110 			thread.bf1.close();
111 		}
112 		thread=null;
113 		currentList=null;
114 		currentLoc=0;
115 //		assert(numIn==numOut) : numIn+", "+numOut;
116 		pushBack=null;
117 		if(verbose){System.err.println("ByteFile2("+name()+").close() returned "+errorState);}
118 		return errorState;
119 	}
120 
121 	@Override
nextLine()122 	public final byte[] nextLine(){
123 
124 		if(pushBack!=null){//Commenting out does not seem to improve speed here.
125 			byte[] temp=pushBack;
126 			pushBack=null;
127 			return temp;
128 		}
129 
130 //		if(verbose){System.err.println("Reading line.");}
131 //		byte[] r=null;
132 
133 		byte[][] temp=currentList;
134 		int tempLoc=currentLoc;
135 
136 		if(temp==null || tempLoc>=temp.length || temp[tempLoc]==null){
137 			boolean b=getBuffer();
138 			if(!b){
139 				if(verbose2){System.err.println("nextLine()->getBuffer() returned false.");}
140 				return null;
141 			}
142 			temp=currentList;
143 			tempLoc=currentLoc;
144 			if(temp==null || temp==poison || temp[tempLoc]==null){
145 				return null;
146 			}
147 		}
148 
149 		//TODO: This is a race condition; currentList can be changed to null.  A defensive copy could be created.
150 		//Note that I read the above warning and added "temp" and "temploc" but I'm not sure if that fixed anything.
151 		assert(temp!=null && temp!=poison);
152 		assert(tempLoc<temp.length);
153 		assert(temp[tempLoc]!=null);
154 		byte[] r=temp[tempLoc];
155 		assert(r!=null);
156 		currentLoc++;
157 //		numOut++;
158 		return r;
159 	}
160 
161 	private boolean getBuffer(){
162 		if(verbose2){System.err.println("Getting new buffer.");}
163 		currentLoc=0;
164 		final BF1Thread bft=thread;
165 		if(bft==null){
166 			currentList=null;
167 			if(verbose2){System.err.println("No buffers available.  thread="+thread+", shutdown="+(thread==null ? "X" : ""+thread.shutdown));}
168 			return false;
169 		}
170 		if(currentList==poison){
171 			if(verbose2){System.err.println("A: Current list is poison.");}
172 			return false;
173 		}
174 		if(currentList!=null){
175 			Arrays.fill(currentList, null); //MUST be done or lines get recycled at end of file.
176 			while(currentList!=null){
177 				try {
178 					if(verbose2){System.err.println("adding to qEmpty list size "+currentList.length+"\n"+toString(currentList));}
179 					bft.qEmpty.put(currentList);
180 					currentList=null;
181 				} catch (InterruptedException e) {
182 					// TODO Auto-generated catch block
183 					e.printStackTrace();
184 				}
185 			}
186 		}
187 		assert(currentList==null);
188 		while(currentList==null){
189 			try {
190 				assert(bft!=null);
191 				if(verbose2){System.err.println("C: qFull.size()="+bft.qFull.size());}
192 				currentList=bft.qFull.take();
193 			} catch (InterruptedException e) {
194 				// TODO Auto-generated catch block
195 				e.printStackTrace();
196 			}
197 		}
198 		if(verbose2){
199 			if(currentList==poison){
200 				System.err.println("B: Current list is poison.");
201 			}else{
202 				System.err.println("getBuffer fetched a new buffer of size "+currentList.length);
203 			}
204 		}
205 		return currentList!=poison;
206 	}
207 
208 	private final synchronized BF1Thread open(){
209 		if(verbose2){System.err.println("ByteFile2("+name()+").open()");}
210 		assert(thread==null);
211 		currentList=null;
212 		currentLoc=0;
213 //		numIn=0;
214 //		numOut=0;
215 		thread=new BF1Thread(ff);
216 		thread.start();
217 		return thread;
218 	}
219 
220 	private class BF1Thread extends Thread{
221 
222 //		public BF1Thread(String fname){
223 //			bf1=new ByteFile1(fname, false, allowSubprocess);
224 //			qFull=new ArrayBlockingQueue<byte[][]>(buffs+2);
225 //			qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2);
226 //			for(int i=0; i<buffs; i++){
227 //				try {
228 //					qEmpty.put(new byte[bufflen][]);
229 //				} catch (InterruptedException e) {
230 //					// TODO Auto-generated catch block
231 //					e.printStackTrace();
232 //				}
233 //			}
234 //		}
235 
236 		public BF1Thread(FileFormat ff){
237 			bf1=new ByteFile1(ff);
238 			qFull=new ArrayBlockingQueue<byte[][]>(buffs+2);
239 			qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2);
240 			for(int i=0; i<buffs; i++){
241 				try {
242 					qEmpty.put(new byte[bufflen][]);
243 				} catch (InterruptedException e) {
244 					// TODO Auto-generated catch block
245 					e.printStackTrace();
246 				}
247 			}
248 		}
249 
250 		@Override
251 		public void run(){
252 			if(verbose){System.err.println("ByteFile2("+name()+").run()");}
253 			byte[] s=null;
254 			byte[][] list=null;
255 			while(list==null){
256 				try {
257 					list = qEmpty.take();
258 				} catch (InterruptedException e1) {
259 					// TODO Auto-generated catch block
260 					e1.printStackTrace();
261 				}
262 			}
263 			synchronized(this){
264 				if(list==poison || shutdown){
265 					shutdown();
266 					return;
267 				}
268 			}
269 
270 			int loc=0;
271 			long bases=0;
272 
273 			//At this point, list is not null
274 			for(s=bf1.nextLine(); s!=null; s=bf1.nextLine()){
275 				bases+=s.length;
276 				assert(list!=null) : "Somehow the list became null for "+bf1.name()+" at line "+cntr;
277 				list[loc]=s;
278 				loc++;
279 //				numIn++;
280 //				if(verbose){System.err.println("Added line "+numIn);}
281 				if(loc>=bufflen || bases>=buffcapacity){
282 					if(verbose2){System.err.println("Capacity exceeded.");}
283 					while(list!=null){
284 						try {
285 //							synchronized(this){
286 //								if(!shutdown){
287 									if(verbose2){
288 										System.err.println("A: Adding to qFull list of size "+loc);
289 										System.err.println(ByteFile2.toString(list));
290 									}
291 									cntr+=list.length;
292 									qFull.put(list);
293 									if(verbose2){System.err.println("A: qFull.size()="+qFull.size());}
294 //								}
295 //							}
296 							list=null;
297 							loc=0;
298 						} catch (InterruptedException e) {
299 							// TODO Auto-generated catch block
300 							e.printStackTrace();
301 						}
302 					}
303 					//At this point, list is null
304 					if(shutdown){
305 						if(verbose2){System.err.println("Break 1");}
306 						break;
307 					}
308 					while(list==null){
309 						if(verbose2){System.err.println("Taking empty list.");}
310 						try {
311 							list = qEmpty.take();
312 						} catch (InterruptedException e1) {
313 							// TODO Auto-generated catch block
314 							e1.printStackTrace();
315 						}
316 					}
317 					//At this point, list is not null
318 					bases=0;
319 					if(list==poison){
320 						if(verbose2){System.err.println("Break 2");}
321 						break;
322 					}
323 					//At this point, list is not null
324 				}
325 			}
326 			if(verbose2){System.err.println("Run loop exit.");}
327 
328 			while(list!=null && loc>0){
329 				try {
330 //					synchronized(this){
331 //						if(!shutdown){
332 							if(verbose2){System.err.println("B: Adding list of size "+loc);}
333 							qFull.put(list);
334 							if(verbose2){System.err.println("B: qFull.size()="+qFull.size());}
335 //						}
336 //					}
337 					list=null;
338 					loc=0;
339 				} catch (InterruptedException e) {
340 					// TODO Auto-generated catch block
341 					e.printStackTrace();
342 				}
343 			}
344 			//At this point, list is null
shutdown()345 			shutdown();
346 
347 			if(verbose){System.err.println("ByteFile2("+name()+").run() finished");}
348 		}
349 
shutdown()350 		synchronized void shutdown(){
351 			if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown()");}
352 			if(shutdown){return;}
353 			shutdown=true;
354 			if(verbose2){System.err.println("Adding poison.");}
355 			qFull.add(poison);
356 			qEmpty.add(poison);
357 			if(verbose2){System.err.println("D: qFull.size()="+qFull.size());}
358 			if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown() finished");}
359 		}
360 
361 		private boolean shutdown=false;
362 		final ByteFile1 bf1;
363 		final ArrayBlockingQueue<byte[][]> qFull;
364 		final ArrayBlockingQueue<byte[][]> qEmpty;
365 
366 	}
367 
368 	@Override
369 	public boolean isOpen(){
370 		final byte[][] list=currentList;
371 		final int loc=currentLoc;
372 		if(list!=null && loc<list.length && list[loc]!=null){return true;}
373 		final BF1Thread bft=thread;
374 		if(bft==null){
375 			return false;
376 		}
377 		return true;
378 //		synchronized(bft){
379 //		//NOTE!!!  This cannot be used because qFull.size() will not return a correctly synchronized value.  Poll() may work.
380 //			assert(bft.bf1.isOpen() || !bft.qFull.isEmpty()) : bft.bf1.isOpen()+", "+bft.qFull.isEmpty()+", "+bft.qFull.size();
381 //			return (bft.bf1.isOpen() || !bft.qFull.isEmpty());
382 //		}
383 	}
384 
385 	@Override
386 	public final void pushBack(byte[] line){
387 		assert(pushBack==null);
388 		pushBack=line;
389 	}
390 
391 //	@Override
392 //	public void pushBack(byte[] line) {
393 //		if(bstart>line.length){
394 //			bstart--;
395 //			buffer[bstart]='\n';
396 //			for(int i=0, j=bstart-line.length; i<line.length; i++, j++){
397 //				buffer[j]=line[i];
398 //			}
399 //			bstart=bstart-line.length;
400 //			return;
401 //		}
402 //
403 //		int bLen=bstop-bstart;
404 //		int newLen=bLen+line.length+1;
405 //		int rShift=line.length+1-bstart;
406 //		assert(rShift>0) : bstop+", "+bstart+", "+line.length;
407 //		while(newLen>buffer.length){
408 //			//This could get big if pushback is used often,
409 //			//unless special steps are taken to prevent it, like leaving extra space for pushbacks.
410 //			buffer=Arrays.copyOf(buffer, buffer.length*2);
411 //		}
412 //
413 //		Tools.shiftRight(buffer, rShift);
414 //
415 //		for(int i=0; i<line.length; i++){
416 //			buffer[i]=line[i];
417 //		}
418 //		buffer[line.length]='\n';
419 //		bstart=0;
420 //		bstop=newLen;
421 //	}
422 
423 	/** For debugging */
424 	private static String toString(byte[][] x){
425 		StringBuilder sb=new StringBuilder();
426 		for(byte[] z : x){
427 			sb.append(z==null ? "null" : new String(z)).append('\n');
428 		}
429 		return sb.toString();
430 	}
431 
432 	@Override
433 	public final InputStream is(){return thread==null ? null : thread.bf1.is();}
434 
435 	@Override
436 	public final long lineNum(){return thread==null ? -1 : thread.bf1.lineNum();}
437 
438 	long cntr;
439 	private BF1Thread thread=null;
440 	private byte[][] currentList=null;
441 	private int currentLoc=0;
442 //	private int currentSize=0;
443 
444 //	private long numIn=0, numOut=0;
445 
446 	private byte[] pushBack=null;
447 
448 	static final byte[][] poison=new byte[0][];
449 	public static boolean verbose=false;
450 	private static final boolean verbose2=false;
451 	private static final int bufflen=1000;
452 	private static final int buffs=4;
453 	private static final int buffcapacity=256000;
454 
455 	private boolean errorState=false;
456 
457 }
458