1 package org.libvirt;
2 
3 import java.io.IOException;
4 
5 import java.nio.ByteBuffer;
6 import java.nio.channels.ByteChannel;
7 import java.nio.channels.ClosedChannelException;
8 import java.nio.channels.NonReadableChannelException;
9 import java.nio.channels.NonWritableChannelException;
10 
11 import org.libvirt.jna.Libvirt;
12 import org.libvirt.jna.SizeT;
13 import org.libvirt.jna.StreamPointer;
14 import static org.libvirt.Library.libvirt;
15 import static org.libvirt.ErrorHandler.processError;
16 
17 /**
18  * The Stream class is used to transfer data between a libvirt daemon
19  * and a client.
20  * <p>
21  * It implements the ByteChannel interface.
22  * <p>
23  * Basic usage:
24  *
25  * <pre>
26  * {@code
27  * ByteBuffer buf = ByteBuffer.allocate(1024);
28  * Stream str = conn.streamNew(0);
29  *
30  * ... // open the stream e.g. calling Domain.screenshot
31  *
32  * while (str.read(buf) != -1) {
33  *     buf.flip();
34  *     ... // do something with the data
35  *     buf.compact();
36  * }}</pre>
37  * <p>
38  * If you want to use this class as an InputStream or OutputStream,
39  * convert it using the {@link java.nio.channels.Channels#newInputStream
40  *  Channels.newInputStream} and {@link java.nio.channels.Channels#newOutputStream
41  *  Channels.newOutputStream} respectively.
42  */
43 public class Stream implements ByteChannel {
44 
45     public static final int VIR_STREAM_NONBLOCK = 1;
46 
47     /**
48      * the native virStreamPtr.
49      */
50     private StreamPointer vsp;
51 
52     /**
53      * The Connect Object that represents the Hypervisor of this Domain
54      */
55     private final Connect virConnect;
56 
57     private static final int CLOSED   =  0;
58     private static final int READABLE =  1;
59     private static final int WRITABLE =  2;
60     private static final int OPEN     = READABLE | WRITABLE;
61     private static final int EOF      =  4;
62 
63     /* The status of the stream. A stream starts its live in the
64      * "CLOSED" state.
65      *
66      * It will be opened for input / output by another libvirt
67      * operation (e.g. virStorageVolDownload), which means it will
68      * be in state "READABLE" or "WRITABLE", exclusively.
69      *
70      * It will reach state "EOF", if {@link finish()} is called.
71      *
72      * It will be in the "CLOSED" state again, after calling abort()
73      * or close().
74      */
75     private int state = CLOSED;
76 
markReadable()77     void markReadable() {
78         assert !isWritable()
79             : "A Stream cannot be readable and writable at the same time";
80 
81         state |= READABLE;
82     }
83 
markWritable()84     void markWritable() {
85         assert !isReadable()
86             : "A Stream cannot be readable and writable at the same time";
87 
88         state |= WRITABLE;
89     }
90 
isReadable()91     boolean isReadable() {
92         return (state & READABLE) != 0;
93     }
94 
isWritable()95     boolean isWritable() {
96         return (state & WRITABLE) != 0;
97     }
98 
isEOF()99     protected boolean isEOF() {
100         return (state & EOF) != 0;
101     }
102 
markEOF()103     private void markEOF() {
104         state |= EOF;
105     }
106 
Stream(final Connect virConnect, final StreamPointer vsp)107     Stream(final Connect virConnect, final StreamPointer vsp) {
108         this.virConnect = virConnect;
109         this.vsp = vsp;
110     }
111 
112     /**
113      * Request that the in progress data transfer be cancelled abnormally before
114      * the end of the stream has been reached
115      *
116      * @return <em>ignore</em> (always 0)
117      */
abort()118     public int abort() throws LibvirtException {
119         int returnValue = processError(libvirt.virStreamAbort(vsp));
120         this.state = CLOSED;
121         return returnValue;
122     }
123 
124     /**
125      * Register a callback to be notified when a stream becomes writable, or
126      * readable.
127      *
128      * @see <a
129      *      href="http://www.libvirt.org/html/libvirt-libvirt.html#virStreamEventAddCallback">Libvirt
130      *      Docs</a>
131      * @param events
132      *            the events to monitor
133      * @param cb
134      *            the callback method
135      * @return <em>ignore</em> (always 0)
136      * @throws LibvirtException
137      */
addCallback(final int events, final Libvirt.VirStreamEventCallback cb)138     public int addCallback(final int events, final Libvirt.VirStreamEventCallback cb)
139             throws LibvirtException {
140         return processError(libvirt.virStreamEventAddCallback(vsp, events, cb, null, null));
141     }
142 
143     @Override
finalize()144     protected void finalize() throws LibvirtException {
145         free();
146     }
147 
148     /**
149      * Indicate that there is no further data is to be transmitted on the
150      * stream.
151      *
152      * @return <em>ignore</em> (always 0)
153      * @throws LibvirtException
154      */
finish()155     public int finish() throws LibvirtException {
156         int returnValue = processError(libvirt.virStreamFinish(vsp));
157         markEOF();
158         return returnValue;
159     }
160 
161     /**
162      * Decrement the reference count on a stream, releasing the stream object if
163      * the reference count has hit zero.
164      *
165      * @return <em>ignore</em> (always 0)
166      * @throws LibvirtException
167      */
free()168     public int free() throws LibvirtException {
169         int success = 0;
170         if (vsp != null) {
171             closeStream();
172             success = processError(libvirt.virStreamFree(vsp));
173             vsp = null;
174         }
175 
176         return success;
177     }
178 
getVsp()179     StreamPointer getVsp() {
180         return vsp;
181     }
182 
183     /**
184      * Receives data from the stream into the buffer provided.
185      *
186      * @param data
187      *            buffer to put the data into
188      * @return the number of bytes read, -1 on error, -2 if the buffer is empty
189      * @throws LibvirtException
190      */
receive(final byte[] data)191     public int receive(final byte[] data) throws LibvirtException {
192         return receive(ByteBuffer.wrap(data));
193     }
194 
receive(final ByteBuffer buffer)195     protected int receive(final ByteBuffer buffer) throws LibvirtException {
196         int returnValue = processError(libvirt.virStreamRecv(vsp, buffer, new SizeT(buffer.remaining())));
197         buffer.position(buffer.position() + returnValue);
198         return returnValue;
199     }
200 
201     @Override
read(final ByteBuffer buffer)202     public int read(final ByteBuffer buffer) throws IOException {
203         if (!isOpen()) {
204             throw new ClosedChannelException();
205         }
206 
207         if (!isReadable()) {
208             throw new NonReadableChannelException();
209         }
210 
211         if (isEOF()) {
212             return -1;
213         }
214 
215         try {
216             int ret = receive(buffer);
217 
218             switch (ret) {
219             case 0:
220                 finish();
221                 return -1;
222 
223             case -2:
224                 throw new UnsupportedOperationException("non-blocking I/O stream not yet supported");
225 
226             default:
227                 return ret;
228             }
229         } catch (LibvirtException e) {
230             throw new IOException("could not read from stream", e);
231         }
232     }
233 
234     @Override
write(final ByteBuffer buffer)235     public int write(final ByteBuffer buffer) throws IOException {
236         if (!isOpen()) {
237             throw new ClosedChannelException();
238         }
239 
240         if (!isWritable()) {
241             throw new NonWritableChannelException();
242         }
243 
244         int pos = buffer.position();
245 
246         try {
247             while (buffer.hasRemaining()) {
248                 int ret = send(buffer);
249 
250                 if (ret == -2) {
251                     throw new UnsupportedOperationException("non-blocking I/O stream not yet supported");
252                 }
253             }
254             return buffer.position() - pos;
255         } catch (LibvirtException e) {
256             throw new IOException("could not write to stream", e);
257         }
258     }
259 
closeStream()260     protected void closeStream() throws LibvirtException {
261         if (isOpen() && !isEOF()) {
262             if (isWritable()) {
263                 finish();
264             } else if (isReadable()) {
265                 abort();
266             }
267         }
268         this.state = CLOSED;
269     }
270 
271     @Override
close()272     public void close() throws IOException {
273         try {
274             closeStream();
275         } catch (LibvirtException e) {
276             throw new IOException("error while closing Stream", e);
277         }
278     }
279 
280     @Override
isOpen()281     public boolean isOpen() {
282         return (this.state & OPEN) != 0;
283     }
284 
285     /**
286      * Batch receive method
287      *
288      * @see <a href="http://www.libvirt.org/html/libvirt-libvirt.html#virStreamRecvAll">virStreamRecvAll</a>
289      * @param handler
290      *            the callback handler
291      * @return <em>ignore</em> (always 0)
292      * @throws LibvirtException
293      */
receiveAll(final Libvirt.VirStreamSinkFunc handler)294     public int receiveAll(final Libvirt.VirStreamSinkFunc handler)
295             throws LibvirtException {
296         return processError(libvirt.virStreamRecvAll(vsp, handler, null));
297     }
298 
299     /**
300      * Remove an event callback from the stream
301      *
302      * @see <a href="http://www.libvirt.org/html/libvirt-libvirt.html#virStreamEventRemoveCallback">Libvirt Docs</a>
303      * @return <em>ignore</em> (always 0)
304      * @throws LibvirtException
305      */
removeCallback()306     public int removeCallback() throws LibvirtException {
307         return processError(libvirt.virStreamEventRemoveCallback(vsp));
308     }
309 
310     /**
311      * Write a series of bytes to the stream.
312      *
313      * @param data
314      *            the data to write
315      * @return the number of bytes written, -1 on error, -2 if the buffer is
316      *         full
317      * @throws LibvirtException
318      */
send(final byte[] data)319     public int send(final byte[] data) throws LibvirtException {
320         return send(ByteBuffer.wrap(data));
321     }
322 
send(final ByteBuffer buffer)323     protected int send(final ByteBuffer buffer) throws LibvirtException {
324         SizeT size = new SizeT(buffer.remaining());
325         int returnValue = processError(libvirt.virStreamSend(vsp, buffer, size));
326         buffer.position(buffer.position() + returnValue);
327         return returnValue;
328     }
329 
330     /**
331      * Batch send method
332      *
333      * @see <a
334      *      href="http://www.libvirt.org/html/libvirt-libvirt.html#virStreamSendAll">Libvirt
335      *      Documentation</a>
336      * @param handler
337      *            the callback handler
338      * @return <em>ignore</em> (always 0)
339      * @throws LibvirtException
340      */
sendAll(final Libvirt.VirStreamSourceFunc handler)341     public int sendAll(final Libvirt.VirStreamSourceFunc handler)
342             throws LibvirtException {
343         return processError(libvirt.virStreamSendAll(vsp, handler, null));
344     }
345 
346     /**
347      * Changes the set of events to monitor for a stream.
348      *
349      * @see <a href="http://www.libvirt.org/html/libvirt-libvirt.html#virStreamEventUpdateCallback">Libvirt Docs</a>
350      * @param events
351      *            the events to monitor
352      * @return <em>ignore</em> (always 0)
353      * @throws LibvirtException
354      */
updateCallback(final int events)355     public int updateCallback(final int events) throws LibvirtException {
356         return processError(libvirt.virStreamEventUpdateCallback(vsp, events));
357     }
358 }
359