1# Copyright (c) 2014 VMware, Inc. 2# All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15 16""" 17Classes defining read and write handles for image transfer. 18 19This module defines various classes for reading and writing files including 20VMDK files in VMware servers. It also contains a class to read images from 21glance server. 22""" 23 24import logging 25import ssl 26import time 27 28from oslo_utils import excutils 29from oslo_utils import netutils 30import requests 31import urllib.parse as urlparse 32from urllib3 import connection as httplib 33 34from oslo_vmware._i18n import _ 35from oslo_vmware import exceptions 36from oslo_vmware import vim_util 37 38 39LOG = logging.getLogger(__name__) 40 41MIN_PROGRESS_DIFF_TO_LOG = 25 42MIN_UPDATE_INTERVAL = 60 43READ_CHUNKSIZE = 65536 44USER_AGENT = 'OpenStack-ESX-Adapter' 45 46 47class FileHandle(object): 48 """Base class for VMware server file (including VMDK) access over HTTP. 49 50 This class wraps a backing file handle and provides utility methods 51 for various sub-classes. 52 """ 53 54 def __init__(self, file_handle): 55 """Initializes the file handle. 56 57 :param file_handle: backing file handle 58 """ 59 self._eof = False 60 self._file_handle = file_handle 61 62 def _create_connection(self, url, method, cacerts=False, 63 ssl_thumbprint=None): 64 _urlparse = urlparse.urlparse(url) 65 scheme, netloc, path, params, query, fragment = _urlparse 66 if scheme == 'http': 67 conn = httplib.HTTPConnection(netloc) 68 elif scheme == 'https': 69 conn = httplib.HTTPSConnection(netloc) 70 cert_reqs = None 71 72 # cacerts can be either True or False or contain 73 # actual certificates. If it is a boolean, then 74 # we need to set cert_reqs and clear the cacerts 75 if isinstance(cacerts, bool): 76 if cacerts: 77 cert_reqs = ssl.CERT_REQUIRED 78 else: 79 cert_reqs = ssl.CERT_NONE 80 cacerts = requests.certs.where() 81 conn.set_cert(ca_certs=cacerts, cert_reqs=cert_reqs, 82 assert_fingerprint=ssl_thumbprint) 83 else: 84 excep_msg = _("Invalid scheme: %s.") % scheme 85 LOG.error(excep_msg) 86 raise ValueError(excep_msg) 87 88 if query: 89 path = path + '?' + query 90 conn.putrequest(method, path) 91 return conn 92 93 def _create_read_connection(self, url, cookies=None, cacerts=False, 94 ssl_thumbprint=None): 95 LOG.debug("Opening URL: %s for reading.", url) 96 try: 97 conn = self._create_connection(url, 'GET', cacerts, ssl_thumbprint) 98 vim_cookie = self._build_vim_cookie_header(cookies) 99 conn.putheader('User-Agent', USER_AGENT) 100 conn.putheader('Cookie', vim_cookie) 101 conn.endheaders() 102 return conn.getresponse() 103 except Exception as excep: 104 # TODO(vbala) We need to catch and raise specific exceptions 105 # related to connection problems, invalid request and invalid 106 # arguments. 107 excep_msg = _("Error occurred while opening URL: %s for " 108 "reading.") % url 109 LOG.exception(excep_msg) 110 raise exceptions.VimException(excep_msg, excep) 111 112 def _create_write_connection(self, method, url, 113 file_size=None, 114 cookies=None, 115 overwrite=None, 116 content_type=None, 117 cacerts=False, 118 ssl_thumbprint=None): 119 """Create HTTP connection to write to VMDK file.""" 120 LOG.debug("Creating HTTP connection to write to file with " 121 "size = %(file_size)d and URL = %(url)s.", 122 {'file_size': file_size, 123 'url': url}) 124 try: 125 conn = self._create_connection(url, method, cacerts, 126 ssl_thumbprint) 127 headers = {'User-Agent': USER_AGENT} 128 if file_size: 129 headers.update({'Content-Length': str(file_size)}) 130 if overwrite: 131 headers.update({'Overwrite': overwrite}) 132 if cookies: 133 headers.update({'Cookie': 134 self._build_vim_cookie_header(cookies)}) 135 if content_type: 136 headers.update({'Content-Type': content_type}) 137 for key, value in headers.items(): 138 conn.putheader(key, value) 139 conn.endheaders() 140 return conn 141 except requests.RequestException as excep: 142 excep_msg = _("Error occurred while creating HTTP connection " 143 "to write to VMDK file with URL = %s.") % url 144 LOG.exception(excep_msg) 145 raise exceptions.VimConnectionException(excep_msg, excep) 146 147 def close(self): 148 """Close the file handle.""" 149 try: 150 self._file_handle.close() 151 except Exception: 152 LOG.warning("Error occurred while closing the file handle", 153 exc_info=True) 154 155 def _build_vim_cookie_header(self, vim_cookies): 156 """Build ESX host session cookie header.""" 157 cookie_header = "" 158 for vim_cookie in vim_cookies: 159 cookie_header = vim_cookie.name + '=' + vim_cookie.value 160 break 161 return cookie_header 162 163 def write(self, data): 164 """Write data to the file. 165 166 :param data: data to be written 167 :raises: NotImplementedError 168 """ 169 raise NotImplementedError() 170 171 def read(self, chunk_size): 172 """Read a chunk of data. 173 174 :param chunk_size: read chunk size 175 :raises: NotImplementedError 176 """ 177 raise NotImplementedError() 178 179 def tell(self): 180 """Get the position of File Handle 181 182 :return: position 183 """ 184 raise NotImplementedError() 185 186 def fileno(self): 187 """returns the integer file descriptor 188 by default this is not supported and raises IOError 189 """ 190 raise IOError() 191 192 def seek(self, offset): 193 """sets the file's current position at the offset 194 195 :param offset: offset 196 """ 197 pass 198 199 def flush(self): 200 """flushes the internal buffer 201 202 """ 203 pass 204 205 def get_size(self): 206 """Get size of the file to be read. 207 208 :raises: NotImplementedError 209 """ 210 raise NotImplementedError() 211 212 def _get_soap_url(self, scheme, host, port): 213 """Returns the IPv4/v6 compatible SOAP URL for the given host.""" 214 if netutils.is_valid_ipv6(host): 215 return '%s://[%s]:%d' % (scheme, host, port) 216 return '%s://%s:%d' % (scheme, host, port) 217 218 219class FileWriteHandle(FileHandle): 220 """Write handle for a file in VMware server.""" 221 222 def __init__(self, host, port, data_center_name, datastore_name, cookies, 223 file_path, file_size, scheme='https', cacerts=False, 224 thumbprint=None): 225 """Initializes the write handle with given parameters. 226 227 :param host: ESX/VC server IP address or host name 228 :param port: port for connection 229 :param data_center_name: name of the data center in the case of a VC 230 server 231 :param datastore_name: name of the datastore where the file is stored 232 :param cookies: cookies to build the vim cookie header 233 :param file_path: datastore path where the file is written 234 :param file_size: size of the file in bytes 235 :param scheme: protocol-- http or https 236 :param cacerts: CA bundle file to use for SSL verification 237 :param thumbprint: expected SHA1 thumbprint of server's certificate 238 :raises: VimConnectionException, ValueError 239 """ 240 soap_url = self._get_soap_url(scheme, host, port) 241 param_list = {'dcPath': data_center_name, 'dsName': datastore_name} 242 self._url = '%s/folder/%s' % (soap_url, file_path) 243 self._url = self._url + '?' + urlparse.urlencode(param_list) 244 245 self._conn = self._create_write_connection('PUT', 246 self._url, 247 file_size, 248 cookies=cookies, 249 cacerts=cacerts, 250 ssl_thumbprint=thumbprint) 251 FileHandle.__init__(self, self._conn) 252 253 def write(self, data): 254 """Write data to the file. 255 256 :param data: data to be written 257 :raises: VimConnectionException, VimException 258 """ 259 try: 260 self._file_handle.send(data) 261 except requests.RequestException as excep: 262 excep_msg = _("Connection error occurred while writing data to" 263 " %s.") % self._url 264 LOG.exception(excep_msg) 265 raise exceptions.VimConnectionException(excep_msg, excep) 266 except Exception as excep: 267 # TODO(vbala) We need to catch and raise specific exceptions 268 # related to connection problems, invalid request and invalid 269 # arguments. 270 excep_msg = _("Error occurred while writing data to" 271 " %s.") % self._url 272 LOG.exception(excep_msg) 273 raise exceptions.VimException(excep_msg, excep) 274 275 def close(self): 276 """Get the response and close the connection.""" 277 LOG.debug("Closing write handle for %s.", self._url) 278 try: 279 self._conn.getresponse() 280 except Exception: 281 LOG.warning("Error occurred while reading the HTTP response.", 282 exc_info=True) 283 super(FileWriteHandle, self).close() 284 285 def __str__(self): 286 return "File write handle for %s" % self._url 287 288 289class VmdkHandle(FileHandle): 290 """VMDK handle based on HttpNfcLease.""" 291 292 def __init__(self, session, lease, url, file_handle): 293 self._session = session 294 self._lease = lease 295 self._url = url 296 self._last_logged_progress = 0 297 self._last_progress_udpate = 0 298 299 super(VmdkHandle, self).__init__(file_handle) 300 301 def _log_progress(self, progress): 302 """Log data transfer progress.""" 303 if (progress == 100 or (progress - self._last_logged_progress >= 304 MIN_PROGRESS_DIFF_TO_LOG)): 305 LOG.debug("Data transfer progress is %d%%.", progress) 306 self._last_logged_progress = progress 307 308 def _get_progress(self): 309 """Get current progress for updating progress to lease.""" 310 pass 311 312 def update_progress(self): 313 """Updates progress to lease. 314 315 This call back to the lease is essential to keep the lease alive 316 across long running write/read operations. 317 318 :raises: VimException, VimFaultException, VimAttributeException, 319 VimSessionOverLoadException, VimConnectionException 320 """ 321 now = time.time() 322 if (now - self._last_progress_udpate < MIN_UPDATE_INTERVAL): 323 return 324 self._last_progress_udpate = now 325 progress = int(self._get_progress()) 326 self._log_progress(progress) 327 328 try: 329 self._session.invoke_api(self._session.vim, 330 'HttpNfcLeaseProgress', 331 self._lease, 332 percent=progress) 333 except exceptions.VimException: 334 with excutils.save_and_reraise_exception(): 335 LOG.exception("Error occurred while updating the " 336 "write/read progress of VMDK file " 337 "with URL = %s.", 338 self._url) 339 340 def _release_lease(self): 341 """Release the lease 342 343 :raises: VimException, VimFaultException, VimAttributeException, 344 VimSessionOverLoadException, VimConnectionException 345 """ 346 LOG.debug("Getting lease state for %s.", self._url) 347 348 state = self._session.invoke_api(vim_util, 349 'get_object_property', 350 self._session.vim, 351 self._lease, 352 'state') 353 LOG.debug("Lease for %(url)s is in state: %(state)s.", 354 {'url': self._url, 355 'state': state}) 356 if self._get_progress() < 100: 357 LOG.error("Aborting lease for %s due to incomplete transfer.", 358 self._url) 359 self._session.invoke_api(self._session.vim, 360 'HttpNfcLeaseAbort', 361 self._lease) 362 elif state == 'ready': 363 LOG.debug("Releasing lease for %s.", self._url) 364 self._session.invoke_api(self._session.vim, 365 'HttpNfcLeaseComplete', 366 self._lease) 367 else: 368 LOG.debug("Lease for %(url)s is in state: %(state)s; no " 369 "need to release.", 370 {'url': self._url, 371 'state': state}) 372 373 @staticmethod 374 def _create_import_vapp_lease(session, rp_ref, import_spec, vm_folder_ref): 375 """Create and wait for HttpNfcLease lease for vApp import.""" 376 LOG.debug("Creating HttpNfcLease lease for vApp import into resource" 377 " pool: %s.", 378 rp_ref) 379 lease = session.invoke_api(session.vim, 380 'ImportVApp', 381 rp_ref, 382 spec=import_spec, 383 folder=vm_folder_ref) 384 LOG.debug("Lease: %(lease)s obtained for vApp import into resource" 385 " pool %(rp_ref)s.", 386 {'lease': lease, 387 'rp_ref': rp_ref}) 388 session.wait_for_lease_ready(lease) 389 390 LOG.debug("Invoking VIM API for reading info of lease: %s.", lease) 391 lease_info = session.invoke_api(vim_util, 392 'get_object_property', 393 session.vim, 394 lease, 395 'info') 396 return lease, lease_info 397 398 @staticmethod 399 def _create_export_vm_lease(session, vm_ref): 400 """Create and wait for HttpNfcLease lease for VM export.""" 401 LOG.debug("Creating HttpNfcLease lease for exporting VM: %s.", 402 vm_ref) 403 lease = session.invoke_api(session.vim, 'ExportVm', vm_ref) 404 LOG.debug("Lease: %(lease)s obtained for exporting VM: %(vm_ref)s.", 405 {'lease': lease, 406 'vm_ref': vm_ref}) 407 session.wait_for_lease_ready(lease) 408 409 LOG.debug("Invoking VIM API for reading info of lease: %s.", lease) 410 lease_info = session.invoke_api(vim_util, 411 'get_object_property', 412 session.vim, 413 lease, 414 'info') 415 return lease, lease_info 416 417 @staticmethod 418 def _fix_esx_url(url, host, port): 419 """Fix netloc in the case of an ESX host. 420 421 In the case of an ESX host, the netloc is set to '*' in the URL 422 returned in HttpNfcLeaseInfo. It should be replaced with host name 423 or IP address. 424 """ 425 urlp = urlparse.urlparse(url) 426 if urlp.netloc == '*': 427 scheme, netloc, path, params, query, fragment = urlp 428 if netutils.is_valid_ipv6(host): 429 netloc = '[%s]:%d' % (host, port) 430 else: 431 netloc = "%s:%d" % (host, port) 432 url = urlparse.urlunparse((scheme, 433 netloc, 434 path, 435 params, 436 query, 437 fragment)) 438 return url 439 440 @staticmethod 441 def _find_vmdk_url(lease_info, host, port): 442 """Find the URL corresponding to a VMDK file in lease info.""" 443 url = None 444 ssl_thumbprint = None 445 for deviceUrl in lease_info.deviceUrl: 446 if deviceUrl.disk: 447 url = VmdkHandle._fix_esx_url(deviceUrl.url, host, port) 448 ssl_thumbprint = deviceUrl.sslThumbprint 449 break 450 if not url: 451 excep_msg = _("Could not retrieve VMDK URL from lease info.") 452 LOG.error(excep_msg) 453 raise exceptions.VimException(excep_msg) 454 LOG.debug("Found VMDK URL: %s from lease info.", url) 455 return url, ssl_thumbprint 456 457 458class VmdkWriteHandle(VmdkHandle): 459 """VMDK write handle based on HttpNfcLease. 460 461 This class creates a vApp in the specified resource pool and uploads the 462 virtual disk contents. 463 """ 464 465 def __init__(self, session, host, port, rp_ref, vm_folder_ref, import_spec, 466 vmdk_size, http_method='PUT'): 467 """Initializes the VMDK write handle with input parameters. 468 469 :param session: valid API session to ESX/VC server 470 :param host: ESX/VC server IP address or host name 471 :param port: port for connection 472 :param rp_ref: resource pool into which the backing VM is imported 473 :param vm_folder_ref: VM folder in ESX/VC inventory to use as parent 474 of backing VM 475 :param import_spec: import specification of the backing VM 476 :param vmdk_size: size of the backing VM's VMDK file 477 :param http_method: either PUT or POST 478 :raises: VimException, VimFaultException, VimAttributeException, 479 VimSessionOverLoadException, VimConnectionException, 480 ValueError 481 """ 482 self._vmdk_size = vmdk_size 483 self._bytes_written = 0 484 485 # Get lease and its info for vApp import 486 lease, lease_info = self._create_import_vapp_lease(session, 487 rp_ref, 488 import_spec, 489 vm_folder_ref) 490 491 # Find VMDK URL where data is to be written 492 url, thumbprint = self._find_vmdk_url(lease_info, host, port) 493 self._vm_ref = lease_info.entity 494 495 cookies = session.vim.client.cookiejar 496 # Create HTTP connection to write to VMDK URL 497 if http_method == 'PUT': 498 overwrite = 't' 499 content_type = 'binary/octet-stream' 500 elif http_method == 'POST': 501 overwrite = None 502 content_type = 'application/x-vnd.vmware-streamVmdk' 503 else: 504 raise ValueError('http_method must be either PUT or POST') 505 self._conn = self._create_write_connection(http_method, 506 url, 507 vmdk_size, 508 cookies=cookies, 509 overwrite=overwrite, 510 content_type=content_type, 511 ssl_thumbprint=thumbprint) 512 super(VmdkWriteHandle, self).__init__(session, lease, url, self._conn) 513 514 def get_imported_vm(self): 515 """"Get managed object reference of the VM created for import. 516 517 :raises: VimException 518 """ 519 if self._get_progress() < 100: 520 excep_msg = _("Incomplete VMDK upload to %s.") % self._url 521 LOG.exception(excep_msg) 522 raise exceptions.ImageTransferException(excep_msg) 523 return self._vm_ref 524 525 def tell(self): 526 return self._bytes_written 527 528 def write(self, data): 529 """Write data to the file. 530 531 :param data: data to be written 532 :raises: VimConnectionException, VimException 533 """ 534 try: 535 self._file_handle.send(data) 536 self._bytes_written += len(data) 537 except requests.RequestException as excep: 538 excep_msg = _("Connection error occurred while writing data to" 539 " %s.") % self._url 540 LOG.exception(excep_msg) 541 raise exceptions.VimConnectionException(excep_msg, excep) 542 except Exception as excep: 543 # TODO(vbala) We need to catch and raise specific exceptions 544 # related to connection problems, invalid request and invalid 545 # arguments. 546 excep_msg = _("Error occurred while writing data to" 547 " %s.") % self._url 548 LOG.exception(excep_msg) 549 raise exceptions.VimException(excep_msg, excep) 550 551 def close(self): 552 """Releases the lease and close the connection. 553 554 :raises: VimAttributeException, VimSessionOverLoadException, 555 VimConnectionException 556 """ 557 try: 558 self._release_lease() 559 except exceptions.ManagedObjectNotFoundException: 560 LOG.info("Lease for %(url)s not found. No need to release.", 561 {'url': self._url}) 562 return 563 except exceptions.VimException: 564 LOG.warning("Error occurred while releasing the lease " 565 "for %s.", 566 self._url, 567 exc_info=True) 568 super(VmdkWriteHandle, self).close() 569 LOG.debug("Closed VMDK write handle for %s.", self._url) 570 571 def _get_progress(self): 572 return float(self._bytes_written) / self._vmdk_size * 100 573 574 def __str__(self): 575 return "VMDK write handle for %s" % self._url 576 577 578class VmdkReadHandle(VmdkHandle): 579 """VMDK read handle based on HttpNfcLease.""" 580 581 def __init__(self, session, host, port, vm_ref, vmdk_path, 582 vmdk_size): 583 """Initializes the VMDK read handle with the given parameters. 584 585 During the read (export) operation, the VMDK file is converted to a 586 stream-optimized sparse disk format. Therefore, the size of the VMDK 587 file read may be smaller than the actual VMDK size. 588 589 :param session: valid api session to ESX/VC server 590 :param host: ESX/VC server IP address or host name 591 :param port: port for connection 592 :param vm_ref: managed object reference of the backing VM whose VMDK 593 is to be exported 594 :param vmdk_path: path of the VMDK file to be exported 595 :param vmdk_size: actual size of the VMDK file 596 :raises: VimException, VimFaultException, VimAttributeException, 597 VimSessionOverLoadException, VimConnectionException 598 """ 599 self._vmdk_size = vmdk_size 600 self._bytes_read = 0 601 602 # Obtain lease for VM export 603 lease, lease_info = self._create_export_vm_lease(session, vm_ref) 604 605 # find URL of the VMDK file to be read and open connection 606 url, thumbprint = self._find_vmdk_url(lease_info, host, port) 607 cookies = session.vim.client.cookiejar 608 self._conn = self._create_read_connection(url, 609 cookies=cookies, 610 ssl_thumbprint=thumbprint) 611 super(VmdkReadHandle, self).__init__(session, lease, url, self._conn) 612 613 def read(self, chunk_size=READ_CHUNKSIZE): 614 """Read a chunk of data from the VMDK file. 615 616 :param chunk_size: size of read chunk 617 :returns: the data 618 :raises: VimException 619 """ 620 try: 621 data = self._file_handle.read(chunk_size) 622 self._bytes_read += len(data) 623 return data 624 except Exception as excep: 625 # TODO(vbala) We need to catch and raise specific exceptions 626 # related to connection problems, invalid request and invalid 627 # arguments. 628 excep_msg = _("Error occurred while reading data from" 629 " %s.") % self._url 630 LOG.exception(excep_msg) 631 raise exceptions.VimException(excep_msg, excep) 632 633 def tell(self): 634 return self._bytes_read 635 636 def close(self): 637 """Releases the lease and close the connection. 638 639 :raises: VimException, VimFaultException, VimAttributeException, 640 VimSessionOverLoadException, VimConnectionException 641 """ 642 try: 643 self._release_lease() 644 except exceptions.ManagedObjectNotFoundException: 645 LOG.info("Lease for %(url)s not found. No need to release.", 646 {'url': self._url}) 647 return 648 except exceptions.VimException: 649 LOG.warning("Error occurred while releasing the lease " 650 "for %s.", 651 self._url, 652 exc_info=True) 653 raise 654 finally: 655 super(VmdkReadHandle, self).close() 656 LOG.debug("Closed VMDK read handle for %s.", self._url) 657 658 def _get_progress(self): 659 return float(self._bytes_read) / self._vmdk_size * 100 660 661 def __str__(self): 662 return "VMDK read handle for %s" % self._url 663 664 665class ImageReadHandle(object): 666 """Read handle for glance images.""" 667 668 def __init__(self, glance_read_iter): 669 """Initializes the read handle with given parameters. 670 671 :param glance_read_iter: iterator to read data from glance image 672 """ 673 self._glance_read_iter = glance_read_iter 674 self._iter = self.get_next() 675 676 def read(self, chunk_size): 677 """Read an item from the image data iterator. 678 679 The input chunk size is ignored since the client ImageBodyIterator 680 uses its own chunk size. 681 """ 682 try: 683 data = next(self._iter) 684 return data 685 except StopIteration: 686 LOG.debug("Completed reading data from the image iterator.") 687 return "" 688 689 def get_next(self): 690 """Get the next item from the image iterator.""" 691 for data in self._glance_read_iter: 692 yield data 693 694 def close(self): 695 """Close the read handle. 696 697 This is a NOP. 698 """ 699 pass 700 701 def __str__(self): 702 return "Image read handle" 703