1# Copyright (c) 2014 VMware, Inc. 2# All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15 16""" 17Functions and classes for image transfer between ESX/VC & image service. 18""" 19 20import logging 21import tarfile 22 23from eventlet import timeout 24 25from oslo_utils import units 26from oslo_vmware._i18n import _ 27from oslo_vmware.common import loopingcall 28from oslo_vmware import constants 29from oslo_vmware import exceptions 30from oslo_vmware import image_util 31from oslo_vmware.objects import datastore as ds_obj 32from oslo_vmware import rw_handles 33from oslo_vmware import vim_util 34 35 36LOG = logging.getLogger(__name__) 37 38NFC_LEASE_UPDATE_PERIOD = 60 # update NFC lease every 60sec. 39CHUNK_SIZE = 64 * units.Ki # default chunk size for image transfer 40 41 42def _create_progress_updater(handle): 43 if isinstance(handle, rw_handles.VmdkHandle): 44 updater = loopingcall.FixedIntervalLoopingCall(handle.update_progress) 45 updater.start(interval=NFC_LEASE_UPDATE_PERIOD) 46 return updater 47 48 49def _start_transfer(read_handle, write_handle, timeout_secs): 50 # read_handle/write_handle could be an NFC lease, so we need to 51 # periodically update its progress 52 read_updater = _create_progress_updater(read_handle) 53 write_updater = _create_progress_updater(write_handle) 54 55 timer = timeout.Timeout(timeout_secs) 56 try: 57 while True: 58 data = read_handle.read(CHUNK_SIZE) 59 if not data: 60 break 61 write_handle.write(data) 62 except timeout.Timeout as excep: 63 msg = (_('Timeout, read_handle: "%(src)s", write_handle: "%(dest)s"') % 64 {'src': read_handle, 65 'dest': write_handle}) 66 LOG.exception(msg) 67 raise exceptions.ImageTransferException(msg, excep) 68 except Exception as excep: 69 msg = (_('Error, read_handle: "%(src)s", write_handle: "%(dest)s"') % 70 {'src': read_handle, 71 'dest': write_handle}) 72 LOG.exception(msg) 73 raise exceptions.ImageTransferException(msg, excep) 74 finally: 75 timer.cancel() 76 if read_updater: 77 read_updater.stop() 78 if write_updater: 79 write_updater.stop() 80 read_handle.close() 81 write_handle.close() 82 83 84def download_image(image, image_meta, session, datastore, rel_path, 85 bypass=True, timeout_secs=7200): 86 """Transfer an image to a datastore. 87 88 :param image: file-like iterator 89 :param image_meta: image metadata 90 :param session: VMwareAPISession object 91 :param datastore: Datastore object 92 :param rel_path: path where the file will be stored in the datastore 93 :param bypass: if set to True, bypass vCenter to download the image 94 :param timeout_secs: time in seconds to wait for the xfer to complete 95 """ 96 image_size = int(image_meta['size']) 97 method = 'PUT' 98 if bypass: 99 hosts = datastore.get_connected_hosts(session) 100 host = ds_obj.Datastore.choose_host(hosts) 101 host_name = session.invoke_api(vim_util, 'get_object_property', 102 session.vim, host, 'name') 103 ds_url = datastore.build_url(session._scheme, host_name, rel_path, 104 constants.ESX_DATACENTER_PATH) 105 cookie = ds_url.get_transfer_ticket(session, method) 106 conn = ds_url.connect(method, image_size, cookie) 107 else: 108 ds_url = datastore.build_url(session._scheme, session._host, rel_path) 109 cookie = '%s=%s' % (constants.SOAP_COOKIE_KEY, 110 session.vim.get_http_cookie().strip("\"")) 111 conn = ds_url.connect(method, image_size, cookie) 112 conn.write = conn.send 113 114 read_handle = rw_handles.ImageReadHandle(image) 115 _start_transfer(read_handle, conn, timeout_secs) 116 117 118def download_flat_image(context, timeout_secs, image_service, image_id, 119 **kwargs): 120 """Download flat image from the image service to VMware server. 121 122 :param context: image service write context 123 :param timeout_secs: time in seconds to wait for the download to complete 124 :param image_service: image service handle 125 :param image_id: ID of the image to be downloaded 126 :param kwargs: keyword arguments to configure the destination 127 file write handle 128 :raises: VimConnectionException, ImageTransferException, ValueError 129 """ 130 LOG.debug("Downloading image: %s from image service as a flat file.", 131 image_id) 132 133 # TODO(vbala) catch specific exceptions raised by download call 134 read_iter = image_service.download(context, image_id) 135 read_handle = rw_handles.ImageReadHandle(read_iter) 136 file_size = int(kwargs.get('image_size')) 137 write_handle = rw_handles.FileWriteHandle(kwargs.get('host'), 138 kwargs.get('port'), 139 kwargs.get('data_center_name'), 140 kwargs.get('datastore_name'), 141 kwargs.get('cookies'), 142 kwargs.get('file_path'), 143 file_size, 144 cacerts=kwargs.get('cacerts')) 145 _start_transfer(read_handle, write_handle, timeout_secs) 146 LOG.debug("Downloaded image: %s from image service as a flat file.", 147 image_id) 148 149 150def download_file( 151 read_handle, host, port, dc_name, ds_name, cookies, 152 upload_file_path, file_size, cacerts, timeout_secs): 153 """Download file to VMware server. 154 155 :param read_handle: file read handle 156 :param host: VMware server host name or IP address 157 :param port: VMware server port number 158 :param dc_name: name of the datacenter which contains the destination 159 datastore 160 :param ds_name: name of the destination datastore 161 :param cookies: cookies to build the cookie header while establishing 162 http connection with VMware server 163 :param upload_file_path: destination datastore file path 164 :param file_size: source file size 165 :param cacerts: CA bundle file to use for SSL verification 166 :param timeout_secs: timeout in seconds to wait for the download to 167 complete 168 """ 169 write_handle = rw_handles.FileWriteHandle(host, 170 port, 171 dc_name, 172 ds_name, 173 cookies, 174 upload_file_path, 175 file_size, 176 cacerts=cacerts) 177 _start_transfer(read_handle, write_handle, timeout_secs) 178 179 180def download_stream_optimized_data(context, timeout_secs, read_handle, 181 **kwargs): 182 """Download stream optimized data to VMware server. 183 184 :param context: image service write context 185 :param timeout_secs: time in seconds to wait for the download to complete 186 :param read_handle: handle from which to read the image data 187 :param kwargs: keyword arguments to configure the destination 188 VMDK write handle 189 :returns: managed object reference of the VM created for import to VMware 190 server 191 :raises: VimException, VimFaultException, VimAttributeException, 192 VimSessionOverLoadException, VimConnectionException, 193 ImageTransferException, ValueError 194 """ 195 file_size = int(kwargs.get('image_size')) 196 write_handle = rw_handles.VmdkWriteHandle(kwargs.get('session'), 197 kwargs.get('host'), 198 kwargs.get('port'), 199 kwargs.get('resource_pool'), 200 kwargs.get('vm_folder'), 201 kwargs.get('vm_import_spec'), 202 file_size, 203 kwargs.get('http_method', 'PUT')) 204 _start_transfer(read_handle, write_handle, timeout_secs) 205 return write_handle.get_imported_vm() 206 207 208def _get_vmdk_handle(ova_handle): 209 210 with tarfile.open(mode="r|", fileobj=ova_handle) as tar: 211 vmdk_name = None 212 for tar_info in tar: 213 if tar_info and tar_info.name.endswith(".ovf"): 214 vmdk_name = image_util.get_vmdk_name_from_ovf( 215 tar.extractfile(tar_info)) 216 elif vmdk_name and tar_info.name.startswith(vmdk_name): 217 # Actual file name is <vmdk_name>.XXXXXXX 218 return tar.extractfile(tar_info) 219 220 221def download_stream_optimized_image(context, timeout_secs, image_service, 222 image_id, **kwargs): 223 """Download stream optimized image from image service to VMware server. 224 225 :param context: image service write context 226 :param timeout_secs: time in seconds to wait for the download to complete 227 :param image_service: image service handle 228 :param image_id: ID of the image to be downloaded 229 :param kwargs: keyword arguments to configure the destination 230 VMDK write handle 231 :returns: managed object reference of the VM created for import to VMware 232 server 233 :raises: VimException, VimFaultException, VimAttributeException, 234 VimSessionOverLoadException, VimConnectionException, 235 ImageTransferException, ValueError 236 """ 237 metadata = image_service.show(context, image_id) 238 container_format = metadata.get('container_format') 239 240 LOG.debug("Downloading image: %(id)s (container: %(container)s) from image" 241 " service as a stream optimized file.", 242 {'id': image_id, 243 'container': container_format}) 244 245 # TODO(vbala) catch specific exceptions raised by download call 246 read_iter = image_service.download(context, image_id) 247 read_handle = rw_handles.ImageReadHandle(read_iter) 248 249 if container_format == 'ova': 250 read_handle = _get_vmdk_handle(read_handle) 251 if read_handle is None: 252 raise exceptions.ImageTransferException( 253 _("No vmdk found in the OVA image %s.") % image_id) 254 255 imported_vm = download_stream_optimized_data(context, timeout_secs, 256 read_handle, **kwargs) 257 258 LOG.debug("Downloaded image: %s from image service as a stream " 259 "optimized file.", 260 image_id) 261 return imported_vm 262 263 264def copy_stream_optimized_disk( 265 context, timeout_secs, write_handle, **kwargs): 266 """Copy virtual disk from VMware server to the given write handle. 267 268 :param context: context 269 :param timeout_secs: time in seconds to wait for the copy to complete 270 :param write_handle: copy destination 271 :param kwargs: keyword arguments to configure the source 272 VMDK read handle 273 :raises: VimException, VimFaultException, VimAttributeException, 274 VimSessionOverLoadException, VimConnectionException, 275 ImageTransferException, ValueError 276 """ 277 vmdk_file_path = kwargs.get('vmdk_file_path') 278 LOG.debug("Copying virtual disk: %(vmdk_path)s to %(dest)s.", 279 {'vmdk_path': vmdk_file_path, 280 'dest': write_handle.name}) 281 file_size = kwargs.get('vmdk_size') 282 read_handle = rw_handles.VmdkReadHandle(kwargs.get('session'), 283 kwargs.get('host'), 284 kwargs.get('port'), 285 kwargs.get('vm'), 286 kwargs.get('vmdk_file_path'), 287 file_size) 288 289 updater = loopingcall.FixedIntervalLoopingCall(read_handle.update_progress) 290 try: 291 updater.start(interval=NFC_LEASE_UPDATE_PERIOD) 292 _start_transfer(read_handle, write_handle, timeout_secs) 293 finally: 294 updater.stop() 295 LOG.debug("Downloaded virtual disk: %s.", vmdk_file_path) 296 297 298# TODO(vbala) Remove dependency on image service provided by the client. 299def upload_image(context, timeout_secs, image_service, image_id, owner_id, 300 **kwargs): 301 """Upload the VM's disk file to image service. 302 303 :param context: image service write context 304 :param timeout_secs: time in seconds to wait for the upload to complete 305 :param image_service: image service handle 306 :param image_id: upload destination image ID 307 :param kwargs: keyword arguments to configure the source 308 VMDK read handle 309 :raises: VimException, VimFaultException, VimAttributeException, 310 VimSessionOverLoadException, VimConnectionException, 311 ImageTransferException, ValueError 312 """ 313 314 LOG.debug("Uploading to image: %s.", image_id) 315 file_size = kwargs.get('vmdk_size') 316 read_handle = rw_handles.VmdkReadHandle(kwargs.get('session'), 317 kwargs.get('host'), 318 kwargs.get('port'), 319 kwargs.get('vm'), 320 kwargs.get('vmdk_file_path'), 321 file_size) 322 323 # TODO(vbala) Remove this after we delete the keyword argument 'is_public' 324 # from all client code. 325 if 'is_public' in kwargs: 326 LOG.debug("Ignoring keyword argument 'is_public'.") 327 328 if 'image_version' in kwargs: 329 LOG.warning("The keyword argument 'image_version' is deprecated " 330 "and will be ignored in the next release.") 331 332 image_ver = str(kwargs.get('image_version')) 333 image_metadata = {'disk_format': 'vmdk', 334 'name': kwargs.get('image_name'), 335 'properties': {'vmware_image_version': image_ver, 336 'vmware_disktype': 'streamOptimized', 337 'owner_id': owner_id}} 338 339 updater = loopingcall.FixedIntervalLoopingCall(read_handle.update_progress) 340 store_id = kwargs.get('store_id') 341 base_image_ref = kwargs.get('base_image_ref') 342 try: 343 updater.start(interval=NFC_LEASE_UPDATE_PERIOD) 344 image_service.update(context, image_id, image_metadata, 345 data=read_handle, store_id=store_id, 346 base_image_ref=base_image_ref) 347 finally: 348 updater.stop() 349 read_handle.close() 350 LOG.debug("Uploaded image: %s.", image_id) 351