1 /* 2 * Copyright (c) 2000 by Matt Welsh and The Regents of the University of 3 * California. All rights reserved. 4 * 5 * Permission to use, copy, modify, and distribute this software and its 6 * documentation for any purpose, without fee, and without written agreement is 7 * hereby granted, provided that the above copyright notice and the following 8 * two paragraphs appear in all copies of this software. 9 * 10 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR 11 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT 12 * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF 13 * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 14 * 15 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 16 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 17 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS 18 * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO 19 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 20 * 21 * Author: Matt Welsh <mdw@cs.berkeley.edu> 22 * 23 */ 24 25 package seda.sandStorm.lib.aDisk; 26 27 import seda.sandStorm.api.*; 28 import seda.sandStorm.core.*; 29 import java.io.*; 30 31 /** 32 * This is an implementation of AFile which uses a pool of threads 33 * which perform blocking I/O (through the java.io.RandomAccessFile 34 * class) on files. This is a portable implementation but is not 35 * intended to be high-performance. 36 * 37 * @author Matt Welsh 38 * @see AFile 39 */ 40 class AFileTPImpl extends AFileImpl implements QueueElementIF { 41 42 private File f; 43 RandomAccessFile raf; 44 private AFile afile; 45 private AFileTPTM tm; 46 private SinkIF compQ; 47 private FiniteQueue eventQ; 48 private boolean readOnly; 49 private boolean closed; 50 51 /** 52 * Create an AFileTPIMpl with the given AFile, filename, completion 53 * queue, create/readOnly flags, and Thread Manager. 54 */ AFileTPImpl(AFile afile, String fname, SinkIF compQ, boolean create, boolean readOnly, AFileTPTM tm)55 AFileTPImpl(AFile afile, String fname, SinkIF compQ, boolean create, boolean readOnly, AFileTPTM tm) throws IOException { 56 this.afile = afile; 57 this.tm = tm; 58 this.compQ = compQ; 59 this.readOnly = readOnly; 60 61 eventQ = new FiniteQueue(); 62 63 f = new File(fname); 64 if (!f.exists() && !create) { 65 throw new FileNotFoundException("File not found: "+fname); 66 } 67 if (f.isDirectory()) { 68 throw new FileIsDirectoryException("Is a directory: "+fname); 69 } 70 71 if (readOnly) { 72 raf = new RandomAccessFile(f, "r"); 73 } else { 74 raf = new RandomAccessFile(f, "rw"); 75 } 76 closed = false; 77 } 78 79 /** 80 * Enqueues the given request (which must be an AFileRequest) 81 * to the file. 82 */ enqueue(QueueElementIF req)83 public void enqueue(QueueElementIF req) throws SinkException { 84 AFileRequest areq = (AFileRequest)req; 85 if (closed) { 86 throw new SinkClosedException("Sink is closed"); 87 } 88 if (readOnly && (areq instanceof AFileWriteRequest)) { 89 throw new BadQueueElementException("Cannot enqueue write request for read-only file", areq); 90 } 91 areq.afile = afile; 92 try { 93 eventQ.enqueue(areq); 94 } catch (SinkException se) { 95 throw new InternalError("AFileTPImpl.enqueue got SinkException - this should not happen, please contact <mdw@cs.berkeley.edu>"); 96 } 97 if (eventQ.size() == 1) { 98 tm.fileReady(this); 99 } 100 } 101 102 /** 103 * Enqueues the given request (which must be an AFileRequest) 104 * to the file. 105 */ enqueue_lossy(QueueElementIF req)106 public boolean enqueue_lossy(QueueElementIF req) { 107 AFileRequest areq = (AFileRequest)req; 108 if (closed || (readOnly && (areq instanceof AFileWriteRequest))) { 109 return false; 110 } 111 areq.afile = afile; 112 try { 113 eventQ.enqueue(areq); 114 } catch (SinkException se) { 115 throw new InternalError("AFileTPImpl.enqueue got SinkException - this should not happen, please contact <mdw@cs.berkeley.edu>"); 116 } 117 if (eventQ.size() == 1) { 118 tm.fileReady(this); 119 } 120 return true; 121 } 122 123 /** 124 * Enqueues the given requests (which must be AFileRequests) 125 * to the file. 126 */ enqueue_many(QueueElementIF[] elements)127 public void enqueue_many(QueueElementIF[] elements) throws SinkException { 128 if (closed) { 129 throw new SinkClosedException("Sink is closed"); 130 } 131 for (int i = 0; i < elements.length; i++) { 132 enqueue(elements[i]); 133 } 134 } 135 136 /** 137 * Return information on the properties of the file. 138 */ stat()139 AFileStat stat() { 140 AFileStat s = new AFileStat(); 141 s.afile = afile; 142 s.isDirectory = f.isDirectory(); 143 s.canRead = f.canRead(); 144 s.canWrite = f.canWrite(); 145 s.length = f.length(); 146 return s; 147 } 148 149 /** 150 * Close the file after all enqueued requests have completed. 151 * Disallows any additional requests to be enqueued on this file. 152 * A SinkClosedEvent will be posted on the file's completion queue 153 * when the close is complete. 154 */ close()155 public void close() { 156 enqueue_lossy(new AFileCloseRequest(afile, compQ)); 157 closed = true; 158 } 159 160 /** 161 * Causes a SinkFlushedEvent to be posted on the file's completion queue 162 * when all pending requests have completed. 163 */ flush()164 public void flush() { 165 enqueue_lossy(new AFileFlushRequest(afile, compQ)); 166 } 167 168 /** 169 * Return the per-file event queue. 170 */ getQueue()171 QueueIF getQueue() { 172 return eventQ; 173 } 174 175 } 176 177 178