1 package org.libvirt; 2 3 import java.io.IOException; 4 5 import java.nio.ByteBuffer; 6 import java.nio.channels.ByteChannel; 7 import java.nio.channels.ClosedChannelException; 8 import java.nio.channels.NonReadableChannelException; 9 import java.nio.channels.NonWritableChannelException; 10 11 import org.libvirt.jna.Libvirt; 12 import org.libvirt.jna.SizeT; 13 import org.libvirt.jna.StreamPointer; 14 import static org.libvirt.Library.libvirt; 15 import static org.libvirt.ErrorHandler.processError; 16 17 /** 18 * The Stream class is used to transfer data between a libvirt daemon 19 * and a client. 20 * <p> 21 * It implements the ByteChannel interface. 22 * <p> 23 * Basic usage: 24 * 25 * <pre> 26 * {@code 27 * ByteBuffer buf = ByteBuffer.allocate(1024); 28 * Stream str = conn.streamNew(0); 29 * 30 * ... // open the stream e.g. calling Domain.screenshot 31 * 32 * while (str.read(buf) != -1) { 33 * buf.flip(); 34 * ... // do something with the data 35 * buf.compact(); 36 * }}</pre> 37 * <p> 38 * If you want to use this class as an InputStream or OutputStream, 39 * convert it using the {@link java.nio.channels.Channels#newInputStream 40 * Channels.newInputStream} and {@link java.nio.channels.Channels#newOutputStream 41 * Channels.newOutputStream} respectively. 42 */ 43 public class Stream implements ByteChannel { 44 45 public static final int VIR_STREAM_NONBLOCK = 1; 46 47 /** 48 * the native virStreamPtr. 49 */ 50 private StreamPointer vsp; 51 52 /** 53 * The Connect Object that represents the Hypervisor of this Domain 54 */ 55 private final Connect virConnect; 56 57 private static final int CLOSED = 0; 58 private static final int READABLE = 1; 59 private static final int WRITABLE = 2; 60 private static final int OPEN = READABLE | WRITABLE; 61 private static final int EOF = 4; 62 63 /* The status of the stream. A stream starts its live in the 64 * "CLOSED" state. 65 * 66 * It will be opened for input / output by another libvirt 67 * operation (e.g. virStorageVolDownload), which means it will 68 * be in state "READABLE" or "WRITABLE", exclusively. 69 * 70 * It will reach state "EOF", if {@link finish()} is called. 71 * 72 * It will be in the "CLOSED" state again, after calling abort() 73 * or close(). 74 */ 75 private int state = CLOSED; 76 markReadable()77 void markReadable() { 78 assert !isWritable() 79 : "A Stream cannot be readable and writable at the same time"; 80 81 state |= READABLE; 82 } 83 markWritable()84 void markWritable() { 85 assert !isReadable() 86 : "A Stream cannot be readable and writable at the same time"; 87 88 state |= WRITABLE; 89 } 90 isReadable()91 boolean isReadable() { 92 return (state & READABLE) != 0; 93 } 94 isWritable()95 boolean isWritable() { 96 return (state & WRITABLE) != 0; 97 } 98 isEOF()99 protected boolean isEOF() { 100 return (state & EOF) != 0; 101 } 102 markEOF()103 private void markEOF() { 104 state |= EOF; 105 } 106 Stream(final Connect virConnect, final StreamPointer vsp)107 Stream(final Connect virConnect, final StreamPointer vsp) { 108 this.virConnect = virConnect; 109 this.vsp = vsp; 110 } 111 112 /** 113 * Request that the in progress data transfer be cancelled abnormally before 114 * the end of the stream has been reached 115 * 116 * @return <em>ignore</em> (always 0) 117 */ abort()118 public int abort() throws LibvirtException { 119 int returnValue = processError(libvirt.virStreamAbort(vsp)); 120 this.state = CLOSED; 121 return returnValue; 122 } 123 124 /** 125 * Register a callback to be notified when a stream becomes writable, or 126 * readable. 127 * 128 * @see <a 129 * href="http://www.libvirt.org/html/libvirt-libvirt.html#virStreamEventAddCallback">Libvirt 130 * Docs</a> 131 * @param events 132 * the events to monitor 133 * @param cb 134 * the callback method 135 * @return <em>ignore</em> (always 0) 136 * @throws LibvirtException 137 */ addCallback(final int events, final Libvirt.VirStreamEventCallback cb)138 public int addCallback(final int events, final Libvirt.VirStreamEventCallback cb) 139 throws LibvirtException { 140 return processError(libvirt.virStreamEventAddCallback(vsp, events, cb, null, null)); 141 } 142 143 @Override finalize()144 protected void finalize() throws LibvirtException { 145 free(); 146 } 147 148 /** 149 * Indicate that there is no further data is to be transmitted on the 150 * stream. 151 * 152 * @return <em>ignore</em> (always 0) 153 * @throws LibvirtException 154 */ finish()155 public int finish() throws LibvirtException { 156 int returnValue = processError(libvirt.virStreamFinish(vsp)); 157 markEOF(); 158 return returnValue; 159 } 160 161 /** 162 * Decrement the reference count on a stream, releasing the stream object if 163 * the reference count has hit zero. 164 * 165 * @return <em>ignore</em> (always 0) 166 * @throws LibvirtException 167 */ free()168 public int free() throws LibvirtException { 169 int success = 0; 170 if (vsp != null) { 171 closeStream(); 172 success = processError(libvirt.virStreamFree(vsp)); 173 vsp = null; 174 } 175 176 return success; 177 } 178 getVsp()179 StreamPointer getVsp() { 180 return vsp; 181 } 182 183 /** 184 * Receives data from the stream into the buffer provided. 185 * 186 * @param data 187 * buffer to put the data into 188 * @return the number of bytes read, -1 on error, -2 if the buffer is empty 189 * @throws LibvirtException 190 */ receive(final byte[] data)191 public int receive(final byte[] data) throws LibvirtException { 192 return receive(ByteBuffer.wrap(data)); 193 } 194 receive(final ByteBuffer buffer)195 protected int receive(final ByteBuffer buffer) throws LibvirtException { 196 int returnValue = processError(libvirt.virStreamRecv(vsp, buffer, new SizeT(buffer.remaining()))); 197 buffer.position(buffer.position() + returnValue); 198 return returnValue; 199 } 200 201 @Override read(final ByteBuffer buffer)202 public int read(final ByteBuffer buffer) throws IOException { 203 if (!isOpen()) { 204 throw new ClosedChannelException(); 205 } 206 207 if (!isReadable()) { 208 throw new NonReadableChannelException(); 209 } 210 211 if (isEOF()) { 212 return -1; 213 } 214 215 try { 216 int ret = receive(buffer); 217 218 switch (ret) { 219 case 0: 220 finish(); 221 return -1; 222 223 case -2: 224 throw new UnsupportedOperationException("non-blocking I/O stream not yet supported"); 225 226 default: 227 return ret; 228 } 229 } catch (LibvirtException e) { 230 throw new IOException("could not read from stream", e); 231 } 232 } 233 234 @Override write(final ByteBuffer buffer)235 public int write(final ByteBuffer buffer) throws IOException { 236 if (!isOpen()) { 237 throw new ClosedChannelException(); 238 } 239 240 if (!isWritable()) { 241 throw new NonWritableChannelException(); 242 } 243 244 int pos = buffer.position(); 245 246 try { 247 while (buffer.hasRemaining()) { 248 int ret = send(buffer); 249 250 if (ret == -2) { 251 throw new UnsupportedOperationException("non-blocking I/O stream not yet supported"); 252 } 253 } 254 return buffer.position() - pos; 255 } catch (LibvirtException e) { 256 throw new IOException("could not write to stream", e); 257 } 258 } 259 closeStream()260 protected void closeStream() throws LibvirtException { 261 if (isOpen() && !isEOF()) { 262 if (isWritable()) { 263 finish(); 264 } else if (isReadable()) { 265 abort(); 266 } 267 } 268 this.state = CLOSED; 269 } 270 271 @Override close()272 public void close() throws IOException { 273 try { 274 closeStream(); 275 } catch (LibvirtException e) { 276 throw new IOException("error while closing Stream", e); 277 } 278 } 279 280 @Override isOpen()281 public boolean isOpen() { 282 return (this.state & OPEN) != 0; 283 } 284 285 /** 286 * Batch receive method 287 * 288 * @see <a href="http://www.libvirt.org/html/libvirt-libvirt.html#virStreamRecvAll">virStreamRecvAll</a> 289 * @param handler 290 * the callback handler 291 * @return <em>ignore</em> (always 0) 292 * @throws LibvirtException 293 */ receiveAll(final Libvirt.VirStreamSinkFunc handler)294 public int receiveAll(final Libvirt.VirStreamSinkFunc handler) 295 throws LibvirtException { 296 return processError(libvirt.virStreamRecvAll(vsp, handler, null)); 297 } 298 299 /** 300 * Remove an event callback from the stream 301 * 302 * @see <a href="http://www.libvirt.org/html/libvirt-libvirt.html#virStreamEventRemoveCallback">Libvirt Docs</a> 303 * @return <em>ignore</em> (always 0) 304 * @throws LibvirtException 305 */ removeCallback()306 public int removeCallback() throws LibvirtException { 307 return processError(libvirt.virStreamEventRemoveCallback(vsp)); 308 } 309 310 /** 311 * Write a series of bytes to the stream. 312 * 313 * @param data 314 * the data to write 315 * @return the number of bytes written, -1 on error, -2 if the buffer is 316 * full 317 * @throws LibvirtException 318 */ send(final byte[] data)319 public int send(final byte[] data) throws LibvirtException { 320 return send(ByteBuffer.wrap(data)); 321 } 322 send(final ByteBuffer buffer)323 protected int send(final ByteBuffer buffer) throws LibvirtException { 324 SizeT size = new SizeT(buffer.remaining()); 325 int returnValue = processError(libvirt.virStreamSend(vsp, buffer, size)); 326 buffer.position(buffer.position() + returnValue); 327 return returnValue; 328 } 329 330 /** 331 * Batch send method 332 * 333 * @see <a 334 * href="http://www.libvirt.org/html/libvirt-libvirt.html#virStreamSendAll">Libvirt 335 * Documentation</a> 336 * @param handler 337 * the callback handler 338 * @return <em>ignore</em> (always 0) 339 * @throws LibvirtException 340 */ sendAll(final Libvirt.VirStreamSourceFunc handler)341 public int sendAll(final Libvirt.VirStreamSourceFunc handler) 342 throws LibvirtException { 343 return processError(libvirt.virStreamSendAll(vsp, handler, null)); 344 } 345 346 /** 347 * Changes the set of events to monitor for a stream. 348 * 349 * @see <a href="http://www.libvirt.org/html/libvirt-libvirt.html#virStreamEventUpdateCallback">Libvirt Docs</a> 350 * @param events 351 * the events to monitor 352 * @return <em>ignore</em> (always 0) 353 * @throws LibvirtException 354 */ updateCallback(final int events)355 public int updateCallback(final int events) throws LibvirtException { 356 return processError(libvirt.virStreamEventUpdateCallback(vsp, events)); 357 } 358 } 359