1 // ======================================================================== 2 // Copyright 2004-2005 Mort Bay Consulting Pty. Ltd. 3 // ------------------------------------------------------------------------ 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 // ======================================================================== 14 15 package org.mortbay.io.nio; 16 17 import java.io.File; 18 import java.io.FileInputStream; 19 import java.io.IOException; 20 import java.io.InputStream; 21 import java.io.OutputStream; 22 import java.nio.ByteBuffer; 23 import java.nio.channels.Channels; 24 import java.nio.channels.FileChannel; 25 import java.nio.channels.ReadableByteChannel; 26 import java.nio.channels.WritableByteChannel; 27 28 import org.mortbay.io.AbstractBuffer; 29 import org.mortbay.io.Buffer; 30 31 /* ------------------------------------------------------------------------------- */ 32 /** 33 * 34 * @author gregw 35 */ 36 public class DirectNIOBuffer extends AbstractBuffer implements NIOBuffer 37 { 38 protected ByteBuffer _buf; 39 private ReadableByteChannel _in; 40 private InputStream _inStream; 41 private WritableByteChannel _out; 42 private OutputStream _outStream; 43 DirectNIOBuffer(int size)44 public DirectNIOBuffer(int size) 45 { 46 super(READWRITE,NON_VOLATILE); 47 _buf = ByteBuffer.allocateDirect(size); 48 _buf.position(0); 49 _buf.limit(_buf.capacity()); 50 } 51 DirectNIOBuffer(ByteBuffer buffer,boolean immutable)52 public DirectNIOBuffer(ByteBuffer buffer,boolean immutable) 53 { 54 super(immutable?IMMUTABLE:READWRITE,NON_VOLATILE); 55 if (!buffer.isDirect()) 56 throw new IllegalArgumentException(); 57 _buf = buffer; 58 setGetIndex(buffer.position()); 59 setPutIndex(buffer.limit()); 60 } 61 62 /** 63 * @param file 64 */ DirectNIOBuffer(File file)65 public DirectNIOBuffer(File file) throws IOException 66 { 67 super(READONLY,NON_VOLATILE); 68 FileInputStream fis = new FileInputStream(file); 69 FileChannel fc = fis.getChannel(); 70 _buf = fc.map(FileChannel.MapMode.READ_ONLY, 0, file.length()); 71 setGetIndex(0); 72 setPutIndex((int)file.length()); 73 _access=IMMUTABLE; 74 } 75 76 /* ------------------------------------------------------------ */ isDirect()77 public boolean isDirect() 78 { 79 return true; 80 } 81 82 /* ------------------------------------------------------------ */ array()83 public byte[] array() 84 { 85 return null; 86 } 87 88 /* ------------------------------------------------------------ */ capacity()89 public int capacity() 90 { 91 return _buf.capacity(); 92 } 93 94 /* ------------------------------------------------------------ */ peek(int position)95 public byte peek(int position) 96 { 97 return _buf.get(position); 98 } 99 peek(int index, byte[] b, int offset, int length)100 public int peek(int index, byte[] b, int offset, int length) 101 { 102 int l = length; 103 if (index+l > capacity()) 104 { 105 l=capacity()-index; 106 if (l==0) 107 return -1; 108 } 109 110 if (l < 0) 111 return -1; 112 try 113 { 114 _buf.position(index); 115 _buf.get(b,offset,l); 116 } 117 finally 118 { 119 _buf.position(0); 120 } 121 122 return l; 123 } 124 poke(int index, byte b)125 public void poke(int index, byte b) 126 { 127 if (isReadOnly()) throw new IllegalStateException(__READONLY); 128 if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0"); 129 if (index > capacity()) 130 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity()); 131 _buf.put(index,b); 132 } 133 poke(int index, Buffer src)134 public int poke(int index, Buffer src) 135 { 136 if (isReadOnly()) throw new IllegalStateException(__READONLY); 137 138 byte[] array=src.array(); 139 if (array!=null) 140 { 141 int length = poke(index,array,src.getIndex(),src.length()); 142 return length; 143 } 144 else 145 { 146 Buffer src_buf=src.buffer(); 147 if (src_buf instanceof DirectNIOBuffer) 148 { 149 ByteBuffer src_bytebuf = ((DirectNIOBuffer)src_buf)._buf; 150 if (src_bytebuf==_buf) 151 src_bytebuf=_buf.duplicate(); 152 try 153 { 154 _buf.position(index); 155 int space = _buf.remaining(); 156 157 int length=src.length(); 158 if (length>space) 159 length=space; 160 161 src_bytebuf.position(src.getIndex()); 162 src_bytebuf.limit(src.getIndex()+length); 163 164 _buf.put(src_bytebuf); 165 return length; 166 } 167 finally 168 { 169 _buf.position(0); 170 src_bytebuf.limit(src_bytebuf.capacity()); 171 src_bytebuf.position(0); 172 } 173 } 174 else 175 return super.poke(index,src); 176 } 177 } 178 poke(int index, byte[] b, int offset, int length)179 public int poke(int index, byte[] b, int offset, int length) 180 { 181 if (isReadOnly()) throw new IllegalStateException(__READONLY); 182 183 if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0"); 184 185 if (index + length > capacity()) 186 { 187 length=capacity()-index; 188 if (length<0) 189 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity()); 190 } 191 192 try 193 { 194 _buf.position(index); 195 196 int space=_buf.remaining(); 197 198 if (length>space) 199 length=space; 200 if (length>0) 201 _buf.put(b,offset,length); 202 return length; 203 } 204 finally 205 { 206 _buf.position(0); 207 } 208 } 209 210 /* ------------------------------------------------------------ */ getByteBuffer()211 public ByteBuffer getByteBuffer() 212 { 213 return _buf; 214 } 215 216 /* ------------------------------------------------------------ */ readFrom(InputStream in, int max)217 public int readFrom(InputStream in, int max) throws IOException 218 { 219 if (_in==null || !_in.isOpen() || in!=_inStream) 220 { 221 _in=Channels.newChannel(in); 222 _inStream=in; 223 } 224 225 if (max<0 || max>space()) 226 max=space(); 227 int p = putIndex(); 228 229 try 230 { 231 int len=0, total=0, available=max; 232 int loop=0; 233 while (total<max) 234 { 235 _buf.position(p); 236 _buf.limit(p+available); 237 len=_in.read(_buf); 238 if (len<0) 239 { 240 _in=null; 241 _inStream=in; 242 break; 243 } 244 else if (len>0) 245 { 246 p += len; 247 total += len; 248 available -= len; 249 setPutIndex(p); 250 loop=0; 251 } 252 else if (loop++>1) 253 break; 254 if (in.available()<=0) 255 break; 256 } 257 if (len<0 && total==0) 258 return -1; 259 return total; 260 261 } 262 catch(IOException e) 263 { 264 _in=null; 265 _inStream=in; 266 throw e; 267 } 268 finally 269 { 270 if (_in!=null && !_in.isOpen()) 271 { 272 _in=null; 273 _inStream=in; 274 } 275 _buf.position(0); 276 _buf.limit(_buf.capacity()); 277 } 278 } 279 280 /* ------------------------------------------------------------ */ writeTo(OutputStream out)281 public void writeTo(OutputStream out) throws IOException 282 { 283 if (_out==null || !_out.isOpen() || _out!=_outStream) 284 { 285 _out=Channels.newChannel(out); 286 _outStream=out; 287 } 288 289 synchronized (_buf) 290 { 291 try 292 { 293 int loop=0; 294 while(hasContent() && _out.isOpen()) 295 { 296 _buf.position(getIndex()); 297 _buf.limit(putIndex()); 298 int len=_out.write(_buf); 299 if (len<0) 300 break; 301 else if (len>0) 302 { 303 skip(len); 304 loop=0; 305 } 306 else if (loop++>1) 307 break; 308 } 309 310 } 311 catch(IOException e) 312 { 313 _out=null; 314 _outStream=null; 315 throw e; 316 } 317 finally 318 { 319 if (_out!=null && !_out.isOpen()) 320 { 321 _out=null; 322 _outStream=null; 323 } 324 _buf.position(0); 325 _buf.limit(_buf.capacity()); 326 } 327 } 328 } 329 330 331 332 } 333