1 def __del__(self) -> None: 2 try: 3 if self.cb: 4 libvirtmod.virStreamEventRemoveCallback(self._o) 5 except AttributeError: 6 pass 7 8 if self._o is not None: 9 libvirtmod.virStreamFree(self._o) 10 self._o = None 11 12 def _dispatchStreamEventCallback(self, events: int, cbData: Dict[str, Any]) -> int: 13 """ 14 Dispatches events to python user's stream event callbacks 15 """ 16 cb = cbData["cb"] 17 opaque = cbData["opaque"] 18 19 cb(self, events, opaque) 20 return 0 21 22 def eventAddCallback(self, events: int, cb: Callable[['virStream', int, _T], None], opaque: _T) -> None: 23 self.cb = cb 24 cbData = {"stream": self, "cb": cb, "opaque": opaque} 25 ret = libvirtmod.virStreamEventAddCallback(self._o, events, cbData) 26 if ret == -1: 27 raise libvirtError('virStreamEventAddCallback() failed') 28 29 def recvAll(self, handler: Callable[['virStream', bytes, _T], int], opaque: _T) -> None: 30 """Receive the entire data stream, sending the data to the 31 requested data sink. This is simply a convenient alternative 32 to virStreamRecv, for apps that do blocking-I/O. 33 34 A hypothetical handler function looks like: 35 36 def handler(stream, # virStream instance 37 buf, # string containing received data 38 opaque): # extra data passed to recvAll as opaque 39 fd = opaque 40 return os.write(fd, buf) 41 """ 42 while True: 43 got = self.recv(virStorageVol.streamBufSize) 44 if got == -2: 45 raise libvirtError("cannot use recvAll with " 46 "nonblocking stream") 47 if len(got) == 0: 48 break 49 50 try: 51 ret = handler(self, got, opaque) 52 if isinstance(ret, int) and ret < 0: 53 raise RuntimeError("recvAll handler returned %d" % ret) 54 except BaseException: 55 try: 56 self.abort() 57 except Exception: 58 pass 59 raise 60 61 def sendAll(self, handler: Callable[['virStream', int, _T], bytes], opaque: _T) -> None: 62 """ 63 Send the entire data stream, reading the data from the 64 requested data source. This is simply a convenient alternative 65 to virStreamSend, for apps that do blocking-I/O. 66 67 A hypothetical handler function looks like: 68 69 def handler(stream, # virStream instance 70 nbytes, # int amt of data to read 71 opaque): # extra data passed to recvAll as opaque 72 fd = opaque 73 return os.read(fd, nbytes) 74 """ 75 while True: 76 try: 77 got = handler(self, virStorageVol.streamBufSize, opaque) 78 except BaseException: 79 try: 80 self.abort() 81 except Exception: 82 pass 83 raise 84 85 if not got: 86 break 87 88 ret = self.send(got) 89 if ret == -2: 90 raise libvirtError("cannot use sendAll with " 91 "nonblocking stream") 92 93 def recv(self, nbytes: int) -> bytes: 94 """Reads a series of bytes from the stream. This method may 95 block the calling application for an arbitrary amount 96 of time. 97 98 Errors are not guaranteed to be reported synchronously 99 with the call, but may instead be delayed until a 100 subsequent call. 101 102 On success, the received data is returned. On failure, an 103 exception is raised. If the stream is a NONBLOCK stream and 104 the request would block, integer -2 is returned. 105 """ 106 ret = libvirtmod.virStreamRecv(self._o, nbytes) 107 if ret is None: 108 raise libvirtError('virStreamRecv() failed') 109 return ret 110 111 def send(self, data: bytes) -> int: 112 """Write a series of bytes to the stream. This method may 113 block the calling application for an arbitrary amount 114 of time. Once an application has finished sending data 115 it should call virStreamFinish to wait for successful 116 confirmation from the driver, or detect any error 117 118 This method may not be used if a stream source has been 119 registered 120 121 Errors are not guaranteed to be reported synchronously 122 with the call, but may instead be delayed until a 123 subsequent call. 124 """ 125 ret = libvirtmod.virStreamSend(self._o, data) 126 if ret == -1: 127 raise libvirtError('virStreamSend() failed') 128 return ret 129 130 def recvHole(self, flags: int = 0) -> int: 131 """This method is used to determine the length in bytes 132 of the empty space to be created in a stream's target 133 file when uploading or downloading sparsely populated 134 files. This is the counterpart to sendHole. 135 """ 136 ret = libvirtmod.virStreamRecvHole(self._o, flags) 137 if ret is None: 138 raise libvirtError('virStreamRecvHole() failed') 139 return ret 140 141 def sendHole(self, length: int, flags: int = 0) -> int: 142 """Rather than transmitting empty file space, this method 143 directs the stream target to create length bytes of empty 144 space. This method would be used when uploading or 145 downloading sparsely populated files to avoid the 146 needless copy of empty file space. 147 """ 148 ret = libvirtmod.virStreamSendHole(self._o, length, flags) 149 if ret == -1: 150 raise libvirtError('virStreamSendHole() failed') 151 return ret 152 153 def recvFlags(self, nbytes: int, flags: int = 0) -> Union[bytes, int]: 154 """Reads a series of bytes from the stream. This method may 155 block the calling application for an arbitrary amount 156 of time. This is just like recv except it has flags 157 argument. 158 159 Errors are not guaranteed to be reported synchronously 160 with the call, but may instead be delayed until a 161 subsequent call. 162 163 On success, the received data is returned. On failure, an 164 exception is raised. If the stream is a NONBLOCK stream and 165 the request would block, integer -2 is returned. 166 """ 167 ret = libvirtmod.virStreamRecvFlags(self._o, nbytes, flags) 168 if ret is None: 169 raise libvirtError('virStreamRecvFlags() failed') 170 return ret 171 172 def sparseRecvAll(self, handler: Callable[['virStream', bytes, _T], Union[bytes, int]], holeHandler: Callable[['virStream', int, _T], Optional[int]], opaque: _T) -> None: 173 """Receive the entire data stream, sending the data to 174 the requested data sink handler and calling the skip 175 holeHandler to generate holes for sparse stream targets. 176 This is simply a convenient alternative to recvFlags, for 177 apps that do blocking-I/O and want to preserve sparseness. 178 179 Hypothetical callbacks can look like this: 180 181 def handler(stream, # virStream instance 182 buf, # string containing received data 183 opaque): # extra data passed to sparseRecvAll as opaque 184 fd = opaque 185 return os.write(fd, buf) 186 187 def holeHandler(stream, # virStream instance 188 length, # number of bytes to skip 189 opaque): # extra data passed to sparseRecvAll as opaque 190 fd = opaque 191 cur = os.lseek(fd, length, os.SEEK_CUR) 192 return os.ftruncate(fd, cur) # take this extra step to 193 # actually allocate the hole 194 """ 195 while True: 196 want = virStorageVol.streamBufSize 197 got = self.recvFlags(want, VIR_STREAM_RECV_STOP_AT_HOLE) 198 if got == -2: 199 raise libvirtError("cannot use sparseRecvAll with " 200 "nonblocking stream") 201 elif got == -3: 202 length = self.recvHole() 203 if length is None: 204 self.abort() 205 raise RuntimeError("recvHole handler failed") 206 ret_hole = holeHandler(self, length, opaque) 207 if isinstance(ret_hole, int) and ret_hole < 0: 208 self.abort() 209 raise RuntimeError("holeHandler handler returned %d" % ret_hole) 210 continue 211 elif isinstance(got, int): 212 raise ValueError(got) 213 elif not isinstance(got, bytes): 214 raise TypeError(type(got)) 215 216 if len(got) == 0: 217 break 218 219 ret_data = handler(self, got, opaque) 220 if isinstance(ret_data, int) and ret_data < 0: 221 self.abort() 222 raise RuntimeError("sparseRecvAll handler returned %d" % ret_data) 223 224 def sparseSendAll(self, handler: Callable[['virStream', int, _T], Union[bytes, int]], holeHandler: Callable[['virStream', _T], Tuple[bool, int]], skipHandler: Callable[['virStream', int, _T], int], opaque: _T) -> None: 225 """Send the entire data stream, reading the data from the 226 requested data source. This is simply a convenient 227 alternative to virStreamSend, for apps that do 228 blocking-I/O and want to preserve sparseness. 229 230 Hypothetical callbacks can look like this: 231 232 def handler(stream, # virStream instance 233 nbytes, # int amt of data to read 234 opaque): # extra data passed to sparseSendAll as opaque 235 fd = opaque 236 return os.read(fd, nbytes) 237 238 def holeHandler(stream, # virStream instance 239 opaque): # extra data passed to sparseSendAll as opaque 240 fd = opaque 241 cur = os.lseek(fd, 0, os.SEEK_CUR) 242 # ... find out current section and its boundaries 243 # and set inData = True/False and sectionLen correspondingly 244 os.lseek(fd, cur, os.SEEK_SET) 245 return [inData, sectionLen] 246 247 def skipHandler(stream, # virStream instance 248 length, # number of bytes to skip 249 opaque): # extra data passed to sparseSendAll as opaque 250 fd = opaque 251 return os.lseek(fd, length, os.SEEK_CUR) 252 253 """ 254 while True: 255 [inData, sectionLen] = holeHandler(self, opaque) 256 if not inData and sectionLen > 0: 257 if (self.sendHole(sectionLen) < 0 or 258 skipHandler(self, sectionLen, opaque) < 0): 259 self.abort() 260 continue 261 262 want = virStorageVol.streamBufSize 263 if (want > sectionLen): 264 want = sectionLen 265 266 got = handler(self, want, opaque) 267 if isinstance(got, int) and got < 0: 268 self.abort() 269 raise RuntimeError("sparseSendAll handler returned %d" % got) 270 271 if not got: 272 break 273 274 assert isinstance(got, bytes) 275 ret = self.send(got) 276 if ret == -2: 277 raise libvirtError("cannot use sparseSendAll with " 278 "nonblocking stream") 279