1 // ========================================================================
2 // Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
3 // ------------------------------------------------------------------------
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 // ========================================================================
14 
15 package org.mortbay.io.nio;
16 
17 import java.io.File;
18 import java.io.FileInputStream;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.Channels;
24 import java.nio.channels.FileChannel;
25 import java.nio.channels.ReadableByteChannel;
26 import java.nio.channels.WritableByteChannel;
27 
28 import org.mortbay.io.AbstractBuffer;
29 import org.mortbay.io.Buffer;
30 
31 /* ------------------------------------------------------------------------------- */
32 /**
33  *
34  * @author gregw
35  */
36 public class DirectNIOBuffer extends AbstractBuffer implements NIOBuffer
37 {
38     protected ByteBuffer _buf;
39     private ReadableByteChannel _in;
40     private InputStream _inStream;
41     private WritableByteChannel _out;
42     private OutputStream _outStream;
43 
DirectNIOBuffer(int size)44     public DirectNIOBuffer(int size)
45     {
46         super(READWRITE,NON_VOLATILE);
47         _buf = ByteBuffer.allocateDirect(size);
48         _buf.position(0);
49         _buf.limit(_buf.capacity());
50     }
51 
DirectNIOBuffer(ByteBuffer buffer,boolean immutable)52     public DirectNIOBuffer(ByteBuffer buffer,boolean immutable)
53     {
54         super(immutable?IMMUTABLE:READWRITE,NON_VOLATILE);
55         if (!buffer.isDirect())
56             throw new IllegalArgumentException();
57         _buf = buffer;
58         setGetIndex(buffer.position());
59         setPutIndex(buffer.limit());
60     }
61 
62     /**
63      * @param file
64      */
DirectNIOBuffer(File file)65     public DirectNIOBuffer(File file) throws IOException
66     {
67         super(READONLY,NON_VOLATILE);
68         FileInputStream fis = new FileInputStream(file);
69         FileChannel fc = fis.getChannel();
70         _buf = fc.map(FileChannel.MapMode.READ_ONLY, 0, file.length());
71         setGetIndex(0);
72         setPutIndex((int)file.length());
73         _access=IMMUTABLE;
74     }
75 
76     /* ------------------------------------------------------------ */
isDirect()77     public boolean isDirect()
78     {
79         return true;
80     }
81 
82     /* ------------------------------------------------------------ */
array()83     public byte[] array()
84     {
85         return null;
86     }
87 
88     /* ------------------------------------------------------------ */
capacity()89     public int capacity()
90     {
91         return _buf.capacity();
92     }
93 
94     /* ------------------------------------------------------------ */
peek(int position)95     public byte peek(int position)
96     {
97         return _buf.get(position);
98     }
99 
peek(int index, byte[] b, int offset, int length)100     public int peek(int index, byte[] b, int offset, int length)
101     {
102         int l = length;
103         if (index+l > capacity())
104         {
105             l=capacity()-index;
106             if (l==0)
107                 return -1;
108         }
109 
110         if (l < 0)
111             return -1;
112         try
113         {
114             _buf.position(index);
115             _buf.get(b,offset,l);
116         }
117         finally
118         {
119             _buf.position(0);
120         }
121 
122         return l;
123     }
124 
poke(int index, byte b)125     public void poke(int index, byte b)
126     {
127         if (isReadOnly()) throw new IllegalStateException(__READONLY);
128         if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
129         if (index > capacity())
130                 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
131         _buf.put(index,b);
132     }
133 
poke(int index, Buffer src)134     public int poke(int index, Buffer src)
135     {
136         if (isReadOnly()) throw new IllegalStateException(__READONLY);
137 
138         byte[] array=src.array();
139         if (array!=null)
140         {
141             int length = poke(index,array,src.getIndex(),src.length());
142             return length;
143         }
144         else
145         {
146             Buffer src_buf=src.buffer();
147             if (src_buf instanceof DirectNIOBuffer)
148             {
149                 ByteBuffer src_bytebuf = ((DirectNIOBuffer)src_buf)._buf;
150                 if (src_bytebuf==_buf)
151                     src_bytebuf=_buf.duplicate();
152                 try
153                 {
154                     _buf.position(index);
155                     int space = _buf.remaining();
156 
157                     int length=src.length();
158                     if (length>space)
159                         length=space;
160 
161                     src_bytebuf.position(src.getIndex());
162                     src_bytebuf.limit(src.getIndex()+length);
163 
164                     _buf.put(src_bytebuf);
165                     return length;
166                 }
167                 finally
168                 {
169                     _buf.position(0);
170                     src_bytebuf.limit(src_bytebuf.capacity());
171                     src_bytebuf.position(0);
172                 }
173             }
174             else
175                 return super.poke(index,src);
176         }
177     }
178 
poke(int index, byte[] b, int offset, int length)179     public int poke(int index, byte[] b, int offset, int length)
180     {
181         if (isReadOnly()) throw new IllegalStateException(__READONLY);
182 
183         if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
184 
185         if (index + length > capacity())
186         {
187             length=capacity()-index;
188             if (length<0)
189                 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
190         }
191 
192         try
193         {
194             _buf.position(index);
195 
196             int space=_buf.remaining();
197 
198             if (length>space)
199                 length=space;
200             if (length>0)
201                 _buf.put(b,offset,length);
202             return length;
203         }
204         finally
205         {
206             _buf.position(0);
207         }
208     }
209 
210     /* ------------------------------------------------------------ */
getByteBuffer()211     public ByteBuffer getByteBuffer()
212     {
213         return _buf;
214     }
215 
216     /* ------------------------------------------------------------ */
readFrom(InputStream in, int max)217     public int readFrom(InputStream in, int max) throws IOException
218     {
219         if (_in==null || !_in.isOpen() || in!=_inStream)
220         {
221             _in=Channels.newChannel(in);
222             _inStream=in;
223         }
224 
225         if (max<0 || max>space())
226             max=space();
227         int p = putIndex();
228 
229         try
230         {
231             int len=0, total=0, available=max;
232             int loop=0;
233             while (total<max)
234             {
235                 _buf.position(p);
236                 _buf.limit(p+available);
237                 len=_in.read(_buf);
238                 if (len<0)
239                 {
240                     _in=null;
241                     _inStream=in;
242                     break;
243                 }
244                 else if (len>0)
245                 {
246                     p += len;
247                     total += len;
248                     available -= len;
249                     setPutIndex(p);
250                     loop=0;
251                 }
252                 else if (loop++>1)
253                     break;
254                 if (in.available()<=0)
255                     break;
256             }
257             if (len<0 && total==0)
258                 return -1;
259             return total;
260 
261         }
262         catch(IOException e)
263         {
264             _in=null;
265             _inStream=in;
266             throw e;
267         }
268         finally
269         {
270             if (_in!=null && !_in.isOpen())
271             {
272                 _in=null;
273                 _inStream=in;
274             }
275             _buf.position(0);
276             _buf.limit(_buf.capacity());
277         }
278     }
279 
280     /* ------------------------------------------------------------ */
writeTo(OutputStream out)281     public void writeTo(OutputStream out) throws IOException
282     {
283         if (_out==null || !_out.isOpen() || _out!=_outStream)
284         {
285             _out=Channels.newChannel(out);
286             _outStream=out;
287         }
288 
289         synchronized (_buf)
290         {
291             try
292             {
293                 int loop=0;
294                 while(hasContent() && _out.isOpen())
295                 {
296                     _buf.position(getIndex());
297                     _buf.limit(putIndex());
298                     int len=_out.write(_buf);
299                     if (len<0)
300                         break;
301                     else if (len>0)
302                     {
303                         skip(len);
304                         loop=0;
305                     }
306                     else if (loop++>1)
307                         break;
308                 }
309 
310             }
311             catch(IOException e)
312             {
313                 _out=null;
314                 _outStream=null;
315                 throw e;
316             }
317             finally
318             {
319                 if (_out!=null && !_out.isOpen())
320                 {
321                     _out=null;
322                     _outStream=null;
323                 }
324                 _buf.position(0);
325                 _buf.limit(_buf.capacity());
326             }
327         }
328     }
329 
330 
331 
332 }
333