1# Copyright (c) 2014 VMware, Inc.
2# All Rights Reserved.
3#
4#    Licensed under the Apache License, Version 2.0 (the "License"); you may
5#    not use this file except in compliance with the License. You may obtain
6#    a copy of the License at
7#
8#         http://www.apache.org/licenses/LICENSE-2.0
9#
10#    Unless required by applicable law or agreed to in writing, software
11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13#    License for the specific language governing permissions and limitations
14#    under the License.
15
16"""
17Classes defining read and write handles for image transfer.
18
19This module defines various classes for reading and writing files including
20VMDK files in VMware servers. It also contains a class to read images from
21glance server.
22"""
23
24import logging
25import ssl
26import time
27
28from oslo_utils import excutils
29from oslo_utils import netutils
30import requests
31import urllib.parse as urlparse
32from urllib3 import connection as httplib
33
34from oslo_vmware._i18n import _
35from oslo_vmware import exceptions
36from oslo_vmware import vim_util
37
38
39LOG = logging.getLogger(__name__)
40
41MIN_PROGRESS_DIFF_TO_LOG = 25
42MIN_UPDATE_INTERVAL = 60
43READ_CHUNKSIZE = 65536
44USER_AGENT = 'OpenStack-ESX-Adapter'
45
46
47class FileHandle(object):
48    """Base class for VMware server file (including VMDK) access over HTTP.
49
50    This class wraps a backing file handle and provides utility methods
51    for various sub-classes.
52    """
53
54    def __init__(self, file_handle):
55        """Initializes the file handle.
56
57        :param file_handle: backing file handle
58        """
59        self._eof = False
60        self._file_handle = file_handle
61
62    def _create_connection(self, url, method, cacerts=False,
63                           ssl_thumbprint=None):
64        _urlparse = urlparse.urlparse(url)
65        scheme, netloc, path, params, query, fragment = _urlparse
66        if scheme == 'http':
67            conn = httplib.HTTPConnection(netloc)
68        elif scheme == 'https':
69            conn = httplib.HTTPSConnection(netloc)
70            cert_reqs = None
71
72            # cacerts can be either True or False or contain
73            # actual certificates. If it is a boolean, then
74            # we need to set cert_reqs and clear the cacerts
75            if isinstance(cacerts, bool):
76                if cacerts:
77                    cert_reqs = ssl.CERT_REQUIRED
78                else:
79                    cert_reqs = ssl.CERT_NONE
80                cacerts = requests.certs.where()
81            conn.set_cert(ca_certs=cacerts, cert_reqs=cert_reqs,
82                          assert_fingerprint=ssl_thumbprint)
83        else:
84            excep_msg = _("Invalid scheme: %s.") % scheme
85            LOG.error(excep_msg)
86            raise ValueError(excep_msg)
87
88        if query:
89            path = path + '?' + query
90        conn.putrequest(method, path)
91        return conn
92
93    def _create_read_connection(self, url, cookies=None, cacerts=False,
94                                ssl_thumbprint=None):
95        LOG.debug("Opening URL: %s for reading.", url)
96        try:
97            conn = self._create_connection(url, 'GET', cacerts, ssl_thumbprint)
98            vim_cookie = self._build_vim_cookie_header(cookies)
99            conn.putheader('User-Agent', USER_AGENT)
100            conn.putheader('Cookie', vim_cookie)
101            conn.endheaders()
102            return conn.getresponse()
103        except Exception as excep:
104            # TODO(vbala) We need to catch and raise specific exceptions
105            # related to connection problems, invalid request and invalid
106            # arguments.
107            excep_msg = _("Error occurred while opening URL: %s for "
108                          "reading.") % url
109            LOG.exception(excep_msg)
110            raise exceptions.VimException(excep_msg, excep)
111
112    def _create_write_connection(self, method, url,
113                                 file_size=None,
114                                 cookies=None,
115                                 overwrite=None,
116                                 content_type=None,
117                                 cacerts=False,
118                                 ssl_thumbprint=None):
119        """Create HTTP connection to write to VMDK file."""
120        LOG.debug("Creating HTTP connection to write to file with "
121                  "size = %(file_size)d and URL = %(url)s.",
122                  {'file_size': file_size,
123                   'url': url})
124        try:
125            conn = self._create_connection(url, method, cacerts,
126                                           ssl_thumbprint)
127            headers = {'User-Agent': USER_AGENT}
128            if file_size:
129                headers.update({'Content-Length': str(file_size)})
130            if overwrite:
131                headers.update({'Overwrite': overwrite})
132            if cookies:
133                headers.update({'Cookie':
134                               self._build_vim_cookie_header(cookies)})
135            if content_type:
136                headers.update({'Content-Type': content_type})
137            for key, value in headers.items():
138                conn.putheader(key, value)
139            conn.endheaders()
140            return conn
141        except requests.RequestException as excep:
142            excep_msg = _("Error occurred while creating HTTP connection "
143                          "to write to VMDK file with URL = %s.") % url
144            LOG.exception(excep_msg)
145            raise exceptions.VimConnectionException(excep_msg, excep)
146
147    def close(self):
148        """Close the file handle."""
149        try:
150            self._file_handle.close()
151        except Exception:
152            LOG.warning("Error occurred while closing the file handle",
153                        exc_info=True)
154
155    def _build_vim_cookie_header(self, vim_cookies):
156        """Build ESX host session cookie header."""
157        cookie_header = ""
158        for vim_cookie in vim_cookies:
159            cookie_header = vim_cookie.name + '=' + vim_cookie.value
160            break
161        return cookie_header
162
163    def write(self, data):
164        """Write data to the file.
165
166        :param data: data to be written
167        :raises: NotImplementedError
168        """
169        raise NotImplementedError()
170
171    def read(self, chunk_size):
172        """Read a chunk of data.
173
174        :param chunk_size: read chunk size
175        :raises: NotImplementedError
176        """
177        raise NotImplementedError()
178
179    def tell(self):
180        """Get the position of File Handle
181
182        :return: position
183        """
184        raise NotImplementedError()
185
186    def fileno(self):
187        """returns the integer file descriptor
188        by default this is not supported and raises IOError
189        """
190        raise IOError()
191
192    def seek(self, offset):
193        """sets the file's current position at the offset
194
195        :param offset: offset
196        """
197        pass
198
199    def flush(self):
200        """flushes the internal buffer
201
202        """
203        pass
204
205    def get_size(self):
206        """Get size of the file to be read.
207
208        :raises: NotImplementedError
209        """
210        raise NotImplementedError()
211
212    def _get_soap_url(self, scheme, host, port):
213        """Returns the IPv4/v6 compatible SOAP URL for the given host."""
214        if netutils.is_valid_ipv6(host):
215            return '%s://[%s]:%d' % (scheme, host, port)
216        return '%s://%s:%d' % (scheme, host, port)
217
218
219class FileWriteHandle(FileHandle):
220    """Write handle for a file in VMware server."""
221
222    def __init__(self, host, port, data_center_name, datastore_name, cookies,
223                 file_path, file_size, scheme='https', cacerts=False,
224                 thumbprint=None):
225        """Initializes the write handle with given parameters.
226
227        :param host: ESX/VC server IP address or host name
228        :param port: port for connection
229        :param data_center_name: name of the data center in the case of a VC
230                                 server
231        :param datastore_name: name of the datastore where the file is stored
232        :param cookies: cookies to build the vim cookie header
233        :param file_path: datastore path where the file is written
234        :param file_size: size of the file in bytes
235        :param scheme: protocol-- http or https
236        :param cacerts: CA bundle file to use for SSL verification
237        :param thumbprint: expected SHA1 thumbprint of server's certificate
238        :raises: VimConnectionException, ValueError
239        """
240        soap_url = self._get_soap_url(scheme, host, port)
241        param_list = {'dcPath': data_center_name, 'dsName': datastore_name}
242        self._url = '%s/folder/%s' % (soap_url, file_path)
243        self._url = self._url + '?' + urlparse.urlencode(param_list)
244
245        self._conn = self._create_write_connection('PUT',
246                                                   self._url,
247                                                   file_size,
248                                                   cookies=cookies,
249                                                   cacerts=cacerts,
250                                                   ssl_thumbprint=thumbprint)
251        FileHandle.__init__(self, self._conn)
252
253    def write(self, data):
254        """Write data to the file.
255
256        :param data: data to be written
257        :raises: VimConnectionException, VimException
258        """
259        try:
260            self._file_handle.send(data)
261        except requests.RequestException as excep:
262            excep_msg = _("Connection error occurred while writing data to"
263                          " %s.") % self._url
264            LOG.exception(excep_msg)
265            raise exceptions.VimConnectionException(excep_msg, excep)
266        except Exception as excep:
267            # TODO(vbala) We need to catch and raise specific exceptions
268            # related to connection problems, invalid request and invalid
269            # arguments.
270            excep_msg = _("Error occurred while writing data to"
271                          " %s.") % self._url
272            LOG.exception(excep_msg)
273            raise exceptions.VimException(excep_msg, excep)
274
275    def close(self):
276        """Get the response and close the connection."""
277        LOG.debug("Closing write handle for %s.", self._url)
278        try:
279            self._conn.getresponse()
280        except Exception:
281            LOG.warning("Error occurred while reading the HTTP response.",
282                        exc_info=True)
283        super(FileWriteHandle, self).close()
284
285    def __str__(self):
286        return "File write handle for %s" % self._url
287
288
289class VmdkHandle(FileHandle):
290    """VMDK handle based on HttpNfcLease."""
291
292    def __init__(self, session, lease, url, file_handle):
293        self._session = session
294        self._lease = lease
295        self._url = url
296        self._last_logged_progress = 0
297        self._last_progress_udpate = 0
298
299        super(VmdkHandle, self).__init__(file_handle)
300
301    def _log_progress(self, progress):
302        """Log data transfer progress."""
303        if (progress == 100 or (progress - self._last_logged_progress >=
304                                MIN_PROGRESS_DIFF_TO_LOG)):
305            LOG.debug("Data transfer progress is %d%%.", progress)
306            self._last_logged_progress = progress
307
308    def _get_progress(self):
309        """Get current progress for updating progress to lease."""
310        pass
311
312    def update_progress(self):
313        """Updates progress to lease.
314
315        This call back to the lease is essential to keep the lease alive
316        across long running write/read operations.
317
318        :raises: VimException, VimFaultException, VimAttributeException,
319                 VimSessionOverLoadException, VimConnectionException
320        """
321        now = time.time()
322        if (now - self._last_progress_udpate < MIN_UPDATE_INTERVAL):
323            return
324        self._last_progress_udpate = now
325        progress = int(self._get_progress())
326        self._log_progress(progress)
327
328        try:
329            self._session.invoke_api(self._session.vim,
330                                     'HttpNfcLeaseProgress',
331                                     self._lease,
332                                     percent=progress)
333        except exceptions.VimException:
334            with excutils.save_and_reraise_exception():
335                LOG.exception("Error occurred while updating the "
336                              "write/read progress of VMDK file "
337                              "with URL = %s.",
338                              self._url)
339
340    def _release_lease(self):
341        """Release the lease
342
343        :raises: VimException, VimFaultException, VimAttributeException,
344                 VimSessionOverLoadException, VimConnectionException
345        """
346        LOG.debug("Getting lease state for %s.", self._url)
347
348        state = self._session.invoke_api(vim_util,
349                                         'get_object_property',
350                                         self._session.vim,
351                                         self._lease,
352                                         'state')
353        LOG.debug("Lease for %(url)s is in state: %(state)s.",
354                  {'url': self._url,
355                   'state': state})
356        if self._get_progress() < 100:
357            LOG.error("Aborting lease for %s due to incomplete transfer.",
358                      self._url)
359            self._session.invoke_api(self._session.vim,
360                                     'HttpNfcLeaseAbort',
361                                     self._lease)
362        elif state == 'ready':
363            LOG.debug("Releasing lease for %s.", self._url)
364            self._session.invoke_api(self._session.vim,
365                                     'HttpNfcLeaseComplete',
366                                     self._lease)
367        else:
368            LOG.debug("Lease for %(url)s is in state: %(state)s; no "
369                      "need to release.",
370                      {'url': self._url,
371                       'state': state})
372
373    @staticmethod
374    def _create_import_vapp_lease(session, rp_ref, import_spec, vm_folder_ref):
375        """Create and wait for HttpNfcLease lease for vApp import."""
376        LOG.debug("Creating HttpNfcLease lease for vApp import into resource"
377                  " pool: %s.",
378                  rp_ref)
379        lease = session.invoke_api(session.vim,
380                                   'ImportVApp',
381                                   rp_ref,
382                                   spec=import_spec,
383                                   folder=vm_folder_ref)
384        LOG.debug("Lease: %(lease)s obtained for vApp import into resource"
385                  " pool %(rp_ref)s.",
386                  {'lease': lease,
387                   'rp_ref': rp_ref})
388        session.wait_for_lease_ready(lease)
389
390        LOG.debug("Invoking VIM API for reading info of lease: %s.", lease)
391        lease_info = session.invoke_api(vim_util,
392                                        'get_object_property',
393                                        session.vim,
394                                        lease,
395                                        'info')
396        return lease, lease_info
397
398    @staticmethod
399    def _create_export_vm_lease(session, vm_ref):
400        """Create and wait for HttpNfcLease lease for VM export."""
401        LOG.debug("Creating HttpNfcLease lease for exporting VM: %s.",
402                  vm_ref)
403        lease = session.invoke_api(session.vim, 'ExportVm', vm_ref)
404        LOG.debug("Lease: %(lease)s obtained for exporting VM: %(vm_ref)s.",
405                  {'lease': lease,
406                   'vm_ref': vm_ref})
407        session.wait_for_lease_ready(lease)
408
409        LOG.debug("Invoking VIM API for reading info of lease: %s.", lease)
410        lease_info = session.invoke_api(vim_util,
411                                        'get_object_property',
412                                        session.vim,
413                                        lease,
414                                        'info')
415        return lease, lease_info
416
417    @staticmethod
418    def _fix_esx_url(url, host, port):
419        """Fix netloc in the case of an ESX host.
420
421        In the case of an ESX host, the netloc is set to '*' in the URL
422        returned in HttpNfcLeaseInfo. It should be replaced with host name
423        or IP address.
424        """
425        urlp = urlparse.urlparse(url)
426        if urlp.netloc == '*':
427            scheme, netloc, path, params, query, fragment = urlp
428            if netutils.is_valid_ipv6(host):
429                netloc = '[%s]:%d' % (host, port)
430            else:
431                netloc = "%s:%d" % (host, port)
432            url = urlparse.urlunparse((scheme,
433                                       netloc,
434                                       path,
435                                       params,
436                                       query,
437                                       fragment))
438        return url
439
440    @staticmethod
441    def _find_vmdk_url(lease_info, host, port):
442        """Find the URL corresponding to a VMDK file in lease info."""
443        url = None
444        ssl_thumbprint = None
445        for deviceUrl in lease_info.deviceUrl:
446            if deviceUrl.disk:
447                url = VmdkHandle._fix_esx_url(deviceUrl.url, host, port)
448                ssl_thumbprint = deviceUrl.sslThumbprint
449                break
450        if not url:
451            excep_msg = _("Could not retrieve VMDK URL from lease info.")
452            LOG.error(excep_msg)
453            raise exceptions.VimException(excep_msg)
454        LOG.debug("Found VMDK URL: %s from lease info.", url)
455        return url, ssl_thumbprint
456
457
458class VmdkWriteHandle(VmdkHandle):
459    """VMDK write handle based on HttpNfcLease.
460
461    This class creates a vApp in the specified resource pool and uploads the
462    virtual disk contents.
463    """
464
465    def __init__(self, session, host, port, rp_ref, vm_folder_ref, import_spec,
466                 vmdk_size, http_method='PUT'):
467        """Initializes the VMDK write handle with input parameters.
468
469        :param session: valid API session to ESX/VC server
470        :param host: ESX/VC server IP address or host name
471        :param port: port for connection
472        :param rp_ref: resource pool into which the backing VM is imported
473        :param vm_folder_ref: VM folder in ESX/VC inventory to use as parent
474                              of backing VM
475        :param import_spec: import specification of the backing VM
476        :param vmdk_size: size of the backing VM's VMDK file
477        :param http_method: either PUT or POST
478        :raises: VimException, VimFaultException, VimAttributeException,
479                 VimSessionOverLoadException, VimConnectionException,
480                 ValueError
481        """
482        self._vmdk_size = vmdk_size
483        self._bytes_written = 0
484
485        # Get lease and its info for vApp import
486        lease, lease_info = self._create_import_vapp_lease(session,
487                                                           rp_ref,
488                                                           import_spec,
489                                                           vm_folder_ref)
490
491        # Find VMDK URL where data is to be written
492        url, thumbprint = self._find_vmdk_url(lease_info, host, port)
493        self._vm_ref = lease_info.entity
494
495        cookies = session.vim.client.cookiejar
496        # Create HTTP connection to write to VMDK URL
497        if http_method == 'PUT':
498            overwrite = 't'
499            content_type = 'binary/octet-stream'
500        elif http_method == 'POST':
501            overwrite = None
502            content_type = 'application/x-vnd.vmware-streamVmdk'
503        else:
504            raise ValueError('http_method must be either PUT or POST')
505        self._conn = self._create_write_connection(http_method,
506                                                   url,
507                                                   vmdk_size,
508                                                   cookies=cookies,
509                                                   overwrite=overwrite,
510                                                   content_type=content_type,
511                                                   ssl_thumbprint=thumbprint)
512        super(VmdkWriteHandle, self).__init__(session, lease, url, self._conn)
513
514    def get_imported_vm(self):
515        """"Get managed object reference of the VM created for import.
516
517        :raises: VimException
518        """
519        if self._get_progress() < 100:
520            excep_msg = _("Incomplete VMDK upload to %s.") % self._url
521            LOG.exception(excep_msg)
522            raise exceptions.ImageTransferException(excep_msg)
523        return self._vm_ref
524
525    def tell(self):
526        return self._bytes_written
527
528    def write(self, data):
529        """Write data to the file.
530
531        :param data: data to be written
532        :raises: VimConnectionException, VimException
533        """
534        try:
535            self._file_handle.send(data)
536            self._bytes_written += len(data)
537        except requests.RequestException as excep:
538            excep_msg = _("Connection error occurred while writing data to"
539                          " %s.") % self._url
540            LOG.exception(excep_msg)
541            raise exceptions.VimConnectionException(excep_msg, excep)
542        except Exception as excep:
543            # TODO(vbala) We need to catch and raise specific exceptions
544            # related to connection problems, invalid request and invalid
545            # arguments.
546            excep_msg = _("Error occurred while writing data to"
547                          " %s.") % self._url
548            LOG.exception(excep_msg)
549            raise exceptions.VimException(excep_msg, excep)
550
551    def close(self):
552        """Releases the lease and close the connection.
553
554        :raises: VimAttributeException, VimSessionOverLoadException,
555                 VimConnectionException
556        """
557        try:
558            self._release_lease()
559        except exceptions.ManagedObjectNotFoundException:
560            LOG.info("Lease for %(url)s not found.  No need to release.",
561                     {'url': self._url})
562            return
563        except exceptions.VimException:
564            LOG.warning("Error occurred while releasing the lease "
565                        "for %s.",
566                        self._url,
567                        exc_info=True)
568        super(VmdkWriteHandle, self).close()
569        LOG.debug("Closed VMDK write handle for %s.", self._url)
570
571    def _get_progress(self):
572        return float(self._bytes_written) / self._vmdk_size * 100
573
574    def __str__(self):
575        return "VMDK write handle for %s" % self._url
576
577
578class VmdkReadHandle(VmdkHandle):
579    """VMDK read handle based on HttpNfcLease."""
580
581    def __init__(self, session, host, port, vm_ref, vmdk_path,
582                 vmdk_size):
583        """Initializes the VMDK read handle with the given parameters.
584
585        During the read (export) operation, the VMDK file is converted to a
586        stream-optimized sparse disk format. Therefore, the size of the VMDK
587        file read may be smaller than the actual VMDK size.
588
589        :param session: valid api session to ESX/VC server
590        :param host: ESX/VC server IP address or host name
591        :param port: port for connection
592        :param vm_ref: managed object reference of the backing VM whose VMDK
593                       is to be exported
594        :param vmdk_path: path of the VMDK file to be exported
595        :param vmdk_size: actual size of the VMDK file
596        :raises: VimException, VimFaultException, VimAttributeException,
597                 VimSessionOverLoadException, VimConnectionException
598        """
599        self._vmdk_size = vmdk_size
600        self._bytes_read = 0
601
602        # Obtain lease for VM export
603        lease, lease_info = self._create_export_vm_lease(session, vm_ref)
604
605        # find URL of the VMDK file to be read and open connection
606        url, thumbprint = self._find_vmdk_url(lease_info, host, port)
607        cookies = session.vim.client.cookiejar
608        self._conn = self._create_read_connection(url,
609                                                  cookies=cookies,
610                                                  ssl_thumbprint=thumbprint)
611        super(VmdkReadHandle, self).__init__(session, lease, url, self._conn)
612
613    def read(self, chunk_size=READ_CHUNKSIZE):
614        """Read a chunk of data from the VMDK file.
615
616        :param chunk_size: size of read chunk
617        :returns: the data
618        :raises: VimException
619        """
620        try:
621            data = self._file_handle.read(chunk_size)
622            self._bytes_read += len(data)
623            return data
624        except Exception as excep:
625            # TODO(vbala) We need to catch and raise specific exceptions
626            # related to connection problems, invalid request and invalid
627            # arguments.
628            excep_msg = _("Error occurred while reading data from"
629                          " %s.") % self._url
630            LOG.exception(excep_msg)
631            raise exceptions.VimException(excep_msg, excep)
632
633    def tell(self):
634        return self._bytes_read
635
636    def close(self):
637        """Releases the lease and close the connection.
638
639        :raises: VimException, VimFaultException, VimAttributeException,
640                 VimSessionOverLoadException, VimConnectionException
641        """
642        try:
643            self._release_lease()
644        except exceptions.ManagedObjectNotFoundException:
645            LOG.info("Lease for %(url)s not found.  No need to release.",
646                     {'url': self._url})
647            return
648        except exceptions.VimException:
649            LOG.warning("Error occurred while releasing the lease "
650                        "for %s.",
651                        self._url,
652                        exc_info=True)
653            raise
654        finally:
655            super(VmdkReadHandle, self).close()
656        LOG.debug("Closed VMDK read handle for %s.", self._url)
657
658    def _get_progress(self):
659        return float(self._bytes_read) / self._vmdk_size * 100
660
661    def __str__(self):
662        return "VMDK read handle for %s" % self._url
663
664
665class ImageReadHandle(object):
666    """Read handle for glance images."""
667
668    def __init__(self, glance_read_iter):
669        """Initializes the read handle with given parameters.
670
671        :param glance_read_iter: iterator to read data from glance image
672        """
673        self._glance_read_iter = glance_read_iter
674        self._iter = self.get_next()
675
676    def read(self, chunk_size):
677        """Read an item from the image data iterator.
678
679        The input chunk size is ignored since the client ImageBodyIterator
680        uses its own chunk size.
681        """
682        try:
683            data = next(self._iter)
684            return data
685        except StopIteration:
686            LOG.debug("Completed reading data from the image iterator.")
687            return ""
688
689    def get_next(self):
690        """Get the next item from the image iterator."""
691        for data in self._glance_read_iter:
692            yield data
693
694    def close(self):
695        """Close the read handle.
696
697        This is a NOP.
698        """
699        pass
700
701    def __str__(self):
702        return "Image read handle"
703