1 package fileIO; 2 import java.io.InputStream; 3 import java.util.Arrays; 4 import java.util.concurrent.ArrayBlockingQueue; 5 6 import shared.Timer; 7 import shared.Tools; 8 9 10 /** 11 * Runs a ByteFile1 in a separate thread. Can speed up disk reading, particularly of compressed files, at cost of slightly more work done. 12 * Drop-in compatible with ByteFile1. 13 * @author Brian Bushnell 14 * @date Sep 23, 2013 15 * 16 */ 17 public final class ByteFile2 extends ByteFile { 18 19 main(String[] args)20 public static void main(String[] args){ 21 ByteFile2 tf=new ByteFile2(args.length>0 ? args[0] : "stdin", true); 22 long first=0, last=100; 23 boolean speedtest=false; 24 if(args.length>1){ 25 if(args[1].equalsIgnoreCase("speedtest")){ 26 speedtest=true; 27 first=0; 28 last=Long.MAX_VALUE; 29 }else{ 30 first=Integer.parseInt(args[1]); 31 last=first+100; 32 } 33 } 34 if(args.length>2){ 35 last=Integer.parseInt(args[2]); 36 } 37 speedtest(tf, first, last, !speedtest); 38 39 tf.close(); 40 tf.reset(); 41 tf.close(); 42 } 43 speedtest(ByteFile2 tf, long first, long last, boolean reprint)44 private static void speedtest(ByteFile2 tf, long first, long last, boolean reprint){ 45 Timer t=new Timer(); 46 long lines=0; 47 long bytes=0; 48 for(long i=0; i<first; i++){tf.nextLine();} 49 if(reprint){ 50 for(long i=first; i<last; i++){ 51 byte[] s=tf.nextLine(); 52 if(s==null){break;} 53 54 lines++; 55 bytes+=s.length; 56 System.out.println(new String(s)); 57 } 58 59 System.err.println("\n"); 60 System.err.println("Lines: "+lines); 61 System.err.println("Bytes: "+bytes); 62 }else{ 63 for(long i=first; i<last; i++){ 64 byte[] s=tf.nextLine(); 65 if(s==null){break;} 66 lines++; 67 bytes+=s.length; 68 } 69 } 70 t.stop(); 71 72 if(!reprint){ 73 System.err.println(Tools.timeLinesBytesProcessed(t, lines, bytes, 8)); 74 } 75 } 76 77 // public ByteFile2(String name()){this(name(), false);} 78 ByteFile2(String fname, boolean allowSubprocess_)79 public ByteFile2(String fname, boolean allowSubprocess_){ 80 this(FileFormat.testInput(fname, FileFormat.TEXT, null, allowSubprocess_, false)); 81 } 82 ByteFile2(FileFormat ff)83 public ByteFile2(FileFormat ff){ 84 super(ff); 85 if(verbose){System.err.println("ByteFile2("+ff+")");} 86 open(); 87 } 88 89 @Override reset()90 public final void reset(){ 91 close(); 92 open(); 93 superReset(); 94 } 95 96 @Override close()97 public synchronized final boolean close(){ 98 if(verbose){System.err.println("ByteFile2("+name()+").close()");} 99 if(isOpen()){ 100 // errorState|=ReadWrite.killProcess(name()); 101 thread.shutdown(); 102 while(thread.getState()!=Thread.State.TERMINATED){ 103 try { 104 thread.join(); 105 } catch (InterruptedException e) { 106 // TODO Auto-generated catch block 107 e.printStackTrace(); 108 } 109 } 110 thread.bf1.close(); 111 } 112 thread=null; 113 currentList=null; 114 currentLoc=0; 115 // assert(numIn==numOut) : numIn+", "+numOut; 116 pushBack=null; 117 if(verbose){System.err.println("ByteFile2("+name()+").close() returned "+errorState);} 118 return errorState; 119 } 120 121 @Override nextLine()122 public final byte[] nextLine(){ 123 124 if(pushBack!=null){//Commenting out does not seem to improve speed here. 125 byte[] temp=pushBack; 126 pushBack=null; 127 return temp; 128 } 129 130 // if(verbose){System.err.println("Reading line.");} 131 // byte[] r=null; 132 133 byte[][] temp=currentList; 134 int tempLoc=currentLoc; 135 136 if(temp==null || tempLoc>=temp.length || temp[tempLoc]==null){ 137 boolean b=getBuffer(); 138 if(!b){ 139 if(verbose2){System.err.println("nextLine()->getBuffer() returned false.");} 140 return null; 141 } 142 temp=currentList; 143 tempLoc=currentLoc; 144 if(temp==null || temp==poison || temp[tempLoc]==null){ 145 return null; 146 } 147 } 148 149 //TODO: This is a race condition; currentList can be changed to null. A defensive copy could be created. 150 //Note that I read the above warning and added "temp" and "temploc" but I'm not sure if that fixed anything. 151 assert(temp!=null && temp!=poison); 152 assert(tempLoc<temp.length); 153 assert(temp[tempLoc]!=null); 154 byte[] r=temp[tempLoc]; 155 assert(r!=null); 156 currentLoc++; 157 // numOut++; 158 return r; 159 } 160 161 private boolean getBuffer(){ 162 if(verbose2){System.err.println("Getting new buffer.");} 163 currentLoc=0; 164 final BF1Thread bft=thread; 165 if(bft==null){ 166 currentList=null; 167 if(verbose2){System.err.println("No buffers available. thread="+thread+", shutdown="+(thread==null ? "X" : ""+thread.shutdown));} 168 return false; 169 } 170 if(currentList==poison){ 171 if(verbose2){System.err.println("A: Current list is poison.");} 172 return false; 173 } 174 if(currentList!=null){ 175 Arrays.fill(currentList, null); //MUST be done or lines get recycled at end of file. 176 while(currentList!=null){ 177 try { 178 if(verbose2){System.err.println("adding to qEmpty list size "+currentList.length+"\n"+toString(currentList));} 179 bft.qEmpty.put(currentList); 180 currentList=null; 181 } catch (InterruptedException e) { 182 // TODO Auto-generated catch block 183 e.printStackTrace(); 184 } 185 } 186 } 187 assert(currentList==null); 188 while(currentList==null){ 189 try { 190 assert(bft!=null); 191 if(verbose2){System.err.println("C: qFull.size()="+bft.qFull.size());} 192 currentList=bft.qFull.take(); 193 } catch (InterruptedException e) { 194 // TODO Auto-generated catch block 195 e.printStackTrace(); 196 } 197 } 198 if(verbose2){ 199 if(currentList==poison){ 200 System.err.println("B: Current list is poison."); 201 }else{ 202 System.err.println("getBuffer fetched a new buffer of size "+currentList.length); 203 } 204 } 205 return currentList!=poison; 206 } 207 208 private final synchronized BF1Thread open(){ 209 if(verbose2){System.err.println("ByteFile2("+name()+").open()");} 210 assert(thread==null); 211 currentList=null; 212 currentLoc=0; 213 // numIn=0; 214 // numOut=0; 215 thread=new BF1Thread(ff); 216 thread.start(); 217 return thread; 218 } 219 220 private class BF1Thread extends Thread{ 221 222 // public BF1Thread(String fname){ 223 // bf1=new ByteFile1(fname, false, allowSubprocess); 224 // qFull=new ArrayBlockingQueue<byte[][]>(buffs+2); 225 // qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2); 226 // for(int i=0; i<buffs; i++){ 227 // try { 228 // qEmpty.put(new byte[bufflen][]); 229 // } catch (InterruptedException e) { 230 // // TODO Auto-generated catch block 231 // e.printStackTrace(); 232 // } 233 // } 234 // } 235 236 public BF1Thread(FileFormat ff){ 237 bf1=new ByteFile1(ff); 238 qFull=new ArrayBlockingQueue<byte[][]>(buffs+2); 239 qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2); 240 for(int i=0; i<buffs; i++){ 241 try { 242 qEmpty.put(new byte[bufflen][]); 243 } catch (InterruptedException e) { 244 // TODO Auto-generated catch block 245 e.printStackTrace(); 246 } 247 } 248 } 249 250 @Override 251 public void run(){ 252 if(verbose){System.err.println("ByteFile2("+name()+").run()");} 253 byte[] s=null; 254 byte[][] list=null; 255 while(list==null){ 256 try { 257 list = qEmpty.take(); 258 } catch (InterruptedException e1) { 259 // TODO Auto-generated catch block 260 e1.printStackTrace(); 261 } 262 } 263 synchronized(this){ 264 if(list==poison || shutdown){ 265 shutdown(); 266 return; 267 } 268 } 269 270 int loc=0; 271 long bases=0; 272 273 //At this point, list is not null 274 for(s=bf1.nextLine(); s!=null; s=bf1.nextLine()){ 275 bases+=s.length; 276 assert(list!=null) : "Somehow the list became null for "+bf1.name()+" at line "+cntr; 277 list[loc]=s; 278 loc++; 279 // numIn++; 280 // if(verbose){System.err.println("Added line "+numIn);} 281 if(loc>=bufflen || bases>=buffcapacity){ 282 if(verbose2){System.err.println("Capacity exceeded.");} 283 while(list!=null){ 284 try { 285 // synchronized(this){ 286 // if(!shutdown){ 287 if(verbose2){ 288 System.err.println("A: Adding to qFull list of size "+loc); 289 System.err.println(ByteFile2.toString(list)); 290 } 291 cntr+=list.length; 292 qFull.put(list); 293 if(verbose2){System.err.println("A: qFull.size()="+qFull.size());} 294 // } 295 // } 296 list=null; 297 loc=0; 298 } catch (InterruptedException e) { 299 // TODO Auto-generated catch block 300 e.printStackTrace(); 301 } 302 } 303 //At this point, list is null 304 if(shutdown){ 305 if(verbose2){System.err.println("Break 1");} 306 break; 307 } 308 while(list==null){ 309 if(verbose2){System.err.println("Taking empty list.");} 310 try { 311 list = qEmpty.take(); 312 } catch (InterruptedException e1) { 313 // TODO Auto-generated catch block 314 e1.printStackTrace(); 315 } 316 } 317 //At this point, list is not null 318 bases=0; 319 if(list==poison){ 320 if(verbose2){System.err.println("Break 2");} 321 break; 322 } 323 //At this point, list is not null 324 } 325 } 326 if(verbose2){System.err.println("Run loop exit.");} 327 328 while(list!=null && loc>0){ 329 try { 330 // synchronized(this){ 331 // if(!shutdown){ 332 if(verbose2){System.err.println("B: Adding list of size "+loc);} 333 qFull.put(list); 334 if(verbose2){System.err.println("B: qFull.size()="+qFull.size());} 335 // } 336 // } 337 list=null; 338 loc=0; 339 } catch (InterruptedException e) { 340 // TODO Auto-generated catch block 341 e.printStackTrace(); 342 } 343 } 344 //At this point, list is null shutdown()345 shutdown(); 346 347 if(verbose){System.err.println("ByteFile2("+name()+").run() finished");} 348 } 349 shutdown()350 synchronized void shutdown(){ 351 if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown()");} 352 if(shutdown){return;} 353 shutdown=true; 354 if(verbose2){System.err.println("Adding poison.");} 355 qFull.add(poison); 356 qEmpty.add(poison); 357 if(verbose2){System.err.println("D: qFull.size()="+qFull.size());} 358 if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown() finished");} 359 } 360 361 private boolean shutdown=false; 362 final ByteFile1 bf1; 363 final ArrayBlockingQueue<byte[][]> qFull; 364 final ArrayBlockingQueue<byte[][]> qEmpty; 365 366 } 367 368 @Override 369 public boolean isOpen(){ 370 final byte[][] list=currentList; 371 final int loc=currentLoc; 372 if(list!=null && loc<list.length && list[loc]!=null){return true;} 373 final BF1Thread bft=thread; 374 if(bft==null){ 375 return false; 376 } 377 return true; 378 // synchronized(bft){ 379 // //NOTE!!! This cannot be used because qFull.size() will not return a correctly synchronized value. Poll() may work. 380 // assert(bft.bf1.isOpen() || !bft.qFull.isEmpty()) : bft.bf1.isOpen()+", "+bft.qFull.isEmpty()+", "+bft.qFull.size(); 381 // return (bft.bf1.isOpen() || !bft.qFull.isEmpty()); 382 // } 383 } 384 385 @Override 386 public final void pushBack(byte[] line){ 387 assert(pushBack==null); 388 pushBack=line; 389 } 390 391 // @Override 392 // public void pushBack(byte[] line) { 393 // if(bstart>line.length){ 394 // bstart--; 395 // buffer[bstart]='\n'; 396 // for(int i=0, j=bstart-line.length; i<line.length; i++, j++){ 397 // buffer[j]=line[i]; 398 // } 399 // bstart=bstart-line.length; 400 // return; 401 // } 402 // 403 // int bLen=bstop-bstart; 404 // int newLen=bLen+line.length+1; 405 // int rShift=line.length+1-bstart; 406 // assert(rShift>0) : bstop+", "+bstart+", "+line.length; 407 // while(newLen>buffer.length){ 408 // //This could get big if pushback is used often, 409 // //unless special steps are taken to prevent it, like leaving extra space for pushbacks. 410 // buffer=Arrays.copyOf(buffer, buffer.length*2); 411 // } 412 // 413 // Tools.shiftRight(buffer, rShift); 414 // 415 // for(int i=0; i<line.length; i++){ 416 // buffer[i]=line[i]; 417 // } 418 // buffer[line.length]='\n'; 419 // bstart=0; 420 // bstop=newLen; 421 // } 422 423 /** For debugging */ 424 private static String toString(byte[][] x){ 425 StringBuilder sb=new StringBuilder(); 426 for(byte[] z : x){ 427 sb.append(z==null ? "null" : new String(z)).append('\n'); 428 } 429 return sb.toString(); 430 } 431 432 @Override 433 public final InputStream is(){return thread==null ? null : thread.bf1.is();} 434 435 @Override 436 public final long lineNum(){return thread==null ? -1 : thread.bf1.lineNum();} 437 438 long cntr; 439 private BF1Thread thread=null; 440 private byte[][] currentList=null; 441 private int currentLoc=0; 442 // private int currentSize=0; 443 444 // private long numIn=0, numOut=0; 445 446 private byte[] pushBack=null; 447 448 static final byte[][] poison=new byte[0][]; 449 public static boolean verbose=false; 450 private static final boolean verbose2=false; 451 private static final int bufflen=1000; 452 private static final int buffs=4; 453 private static final int buffcapacity=256000; 454 455 private boolean errorState=false; 456 457 } 458