1# Copyright (c) 2017 Veritas Technologies LLC.  All rights reserved.
2#
3#    Licensed under the Apache License, Version 2.0 (the "License"); you may
4#    not use this file except in compliance with the License. You may obtain
5#    a copy of the License at
6#
7#         http://www.apache.org/licenses/LICENSE-2.0
8#
9#    Unless required by applicable law or agreed to in writing, software
10#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12#    License for the specific language governing permissions and limitations
13#    under the License.
14"""
15Cinder Driver for HyperScale
16"""
17
18import os
19
20from oslo_config import cfg
21from oslo_log import log as logging
22from oslo_utils import excutils
23import six
24
25from cinder import exception
26from cinder.i18n import _
27from cinder.image import image_utils
28from cinder import interface
29from cinder import utils
30from cinder.volume import driver
31from cinder.volume.drivers.veritas import hs_constants as constants
32from cinder.volume.drivers.veritas import utils as util
33
34CONF = cfg.CONF
35LOG = logging.getLogger(__name__)
36TYPE_EPISODIC_SNAP = '0'
37TYPE_USER_SNAP = '1'
38TYPE_WORKFLOW_SNAP = '2'
39
40BLOCK_SIZE = 8
41MAX_REPLICAS = 2
42DEFAULT_REPLICAS = 1
43POOL_NAME = '{30c39970-ad80-4950-5490-8431abfaaaf0}'
44HYPERSCALE_VERSION = '1.0.0'
45PROVIDER_LOCATION_MNT = "/hyperscale"
46PROVIDER_LOCATION = 'hyperscale-sv:' + PROVIDER_LOCATION_MNT
47
48
49@interface.volumedriver
50class HyperScaleDriver(driver.VolumeDriver):
51
52    VERSION = '1.0'
53    # ThirdPartySytems wiki page
54    CI_WIKI_NAME = "Veritas_HyperScale_CI"
55
56    def __init__(self, *args, **kwargs):
57        """Initialization"""
58
59        super(HyperScaleDriver, self).__init__(*args, **kwargs)
60
61        self.compute_map = {}
62        self.vsa_map = {}
63        self.compute_meta_map = {}
64        self.vsa_compute_map = {}
65        self.old_total = 0
66        self.old_free = 0
67        self.my_dnid = None
68
69    @staticmethod
70    def _fetch_config_for_controller():
71        return HyperScaleDriver._fetch_config_information(
72            persona='controller')
73
74    @staticmethod
75    def _fetch_config_for_compute():
76        return HyperScaleDriver._fetch_config_information(
77            persona='compute')
78
79    @staticmethod
80    def _fetch_config_for_datanode():
81        return HyperScaleDriver._fetch_config_information(
82            persona='datanode')
83
84    @staticmethod
85    def _fetch_config_information(persona):
86        # Get hyperscale config information for persona
87        configuration = util.get_configuration(persona)
88        return configuration
89
90    @utils.trace_method
91    def check_for_setup_error(self):
92        # check if HyperScale has been installed correctly
93        try:
94            version = util.get_hyperscale_version()
95
96            if version != HYPERSCALE_VERSION:
97                raise exception.VolumeBackendAPIException(
98                    data=(_("Unsupported version: %s") % version))
99        except (exception.ErrorInHyperScaleVersion,
100                exception.UnableToExecuteHyperScaleCmd):
101            err_msg = _('Exception in getting HyperScale version')
102            LOG.exception(err_msg)
103            raise exception.VolumeBackendAPIException(data=err_msg)
104
105    def _get_replicas(self, volume, metadata):
106        """Get the replicas."""
107        try:
108            ref_targets = self._get_volume_metadata_value(metadata,
109                                                          'reflection_targets')
110            if ref_targets is not None:
111                replicas = MAX_REPLICAS
112            else:
113                replicas = DEFAULT_REPLICAS
114
115        except Exception:
116            LOG.exception("Exception in getting reflection targets")
117            replicas = DEFAULT_REPLICAS
118
119        LOG.debug("Number of replicas: %s", replicas)
120        return replicas
121
122    @utils.trace_method
123    def do_setup(self, context):
124        """Any initialization the volume driver does while starting."""
125        super(HyperScaleDriver, self).do_setup(context)
126
127        try:
128            # Get computes info
129            computes = HyperScaleDriver._fetch_config_for_compute()
130            if computes is None:
131                computes = {}
132
133            for compute in computes.keys():
134                if 'disabled' in computes[compute].keys():
135                    disabled = computes[compute]['disabled']
136                    if disabled == "1":
137                        continue
138                vsa_ip = computes[compute]['vsa_ip']
139                vsa_isolated_ip = computes[compute]['vsa_isolated_ip']
140                vsa_section_header = computes[compute]['vsa_section_header']
141                compute_name = computes[compute]['compute_name']
142                self.compute_map[vsa_ip] = vsa_isolated_ip
143                self.vsa_map[vsa_ip] = vsa_section_header
144                self.compute_meta_map[compute_name] = vsa_ip
145                self.vsa_compute_map[vsa_ip] = compute_name
146
147            # Get controller info
148            cntr_info = HyperScaleDriver._fetch_config_for_controller()
149            if cntr_info is None:
150                cntr_info = {}
151
152            # Get data node info
153            self.my_dnid = util.get_datanode_id()
154            datanodes = HyperScaleDriver._fetch_config_for_datanode()
155            if datanodes is None:
156                datanodes = {}
157
158            for key, value in datanodes.items():
159                if self.my_dnid == value['hypervisor_id']:
160                    self.datanode_hostname = value['datanode_name']
161                    self.datanode_ip = value['data_ip']
162                    self.dn_routing_key = value['hypervisor_id']
163
164            LOG.debug("In init compute_map %s", self.compute_map)
165            LOG.debug("In init vsa_map %s", self.vsa_map)
166            LOG.debug("In init compute_meta_map %s", self.compute_meta_map)
167
168        except (exception.UnableToProcessHyperScaleCmdOutput,
169                exception.ErrorInFetchingConfiguration):
170            err_msg = _("Unable to initialise the Veritas cinder driver")
171            LOG.exception(err_msg)
172            raise exception.VolumeBackendAPIException(data=err_msg)
173
174        except Exception:
175            err_msg = _("Internal error occurred")
176            LOG.exception(err_msg)
177            raise exception.VolumeBackendAPIException(data=err_msg)
178
179    @utils.trace_method
180    def create_cloned_volume(self, volume, src_vref):
181        """Creates a clone of the specified volume."""
182
183        LOG.debug("Clone volume")
184        model_update = {}
185        try:
186            LOG.debug("Clone new volume %(t_id)s from source volume %(s_id)s",
187                      {"t_id": volume['id'], "s_id": src_vref['id']})
188            # 1. Make a call to DN
189            # Check if current_dn_owner is set.
190
191            rt_key = None
192            # Get metadata for volume
193            metadata = self._get_volume_metadata(src_vref)
194            rt_key = self._get_volume_metadata_value(metadata,
195                                                     'current_dn_owner')
196            if rt_key is None:
197                rt_key = self.dn_routing_key
198
199            util.message_data_plane(
200                rt_key,
201                'hyperscale.storage.dm.volume.clone',
202                pool_name=POOL_NAME,
203                display_name=util.get_guid_with_curly_brackets(
204                    volume['id']),
205                version_name=util.get_guid_with_curly_brackets(
206                    src_vref['id']),
207                volume_raw_size=volume['size'],
208                volume_qos=1,
209                parent_volume_guid=util.get_guid_with_curly_brackets(
210                    src_vref['id']),
211                user_id=util.get_guid_with_curly_brackets(
212                    volume['user_id']),
213                project_id=util.get_guid_with_curly_brackets(
214                    volume['project_id']),
215                volume_guid=util.get_guid_with_curly_brackets(
216                    volume['id']))
217
218            LOG.debug("Volume cloned successfully on data node")
219
220            # Get metadata for volume
221            volume_metadata = self._get_volume_metadata(volume)
222            parent_cur_dn = self._get_volume_metadata_value(metadata,
223                                                            'current_dn_ip')
224
225            metadata_update = {}
226            metadata_update['Primary_datanode_ip'] = parent_cur_dn
227            metadata_update['current_dn_owner'] = rt_key
228            metadata_update['current_dn_ip'] = parent_cur_dn
229            metadata_update['source_volid'] = src_vref['id']
230            metadata_update['size'] = src_vref['size']
231
232            # 2. Choose a potential replica here.
233            # The actual decision to have potential replica is made in NOVA.
234            rt_key, rt_dn_ip = self._select_rt(volume,
235                                               volume_metadata,
236                                               only_select=True)
237
238            if rt_key and rt_dn_ip:
239                metadata_update['Potential_secondary_key'] = rt_key
240                metadata_update['Potential_secondary_ip'] = rt_dn_ip
241
242        except (exception.UnableToExecuteHyperScaleCmd,
243                exception.UnableToProcessHyperScaleCmdOutput):
244            with excutils.save_and_reraise_exception():
245                LOG.exception('Exception in clone volume', exc_info=True)
246        except exception.InvalidMetadataType:
247            with excutils.save_and_reraise_exception():
248                LOG.exception('Exception updating metadata in clone'
249                              ' volume', exc_info=True)
250
251        volume_metadata.update(metadata_update)
252        volume['provider_location'] = PROVIDER_LOCATION
253        model_update = {'provider_location': volume['provider_location'],
254                        'metadata': volume_metadata}
255
256        return model_update
257
258    def _get_datanodes_info(self):
259        # Get hyperscale datanode config information from controller
260
261        msg_body = {}
262        data = None
263
264        try:
265            cmd_out, cmd_error = util.message_controller(
266                constants.HS_CONTROLLER_EXCH,
267                'hyperscale.controller.get.membership',
268                **msg_body)
269            LOG.debug("Response Message from Controller: %s",
270                      cmd_out)
271            payload = cmd_out.get('payload')
272            data = payload.get('of_membership')
273
274        except (exception.UnableToExecuteHyperScaleCmd,
275                exception.UnableToProcessHyperScaleCmdOutput):
276            with excutils.save_and_reraise_exception():
277                LOG.exception("Failed to get datanode config "
278                              "information from controller")
279
280        return data
281
282    def _select_rt(self, volume, metadata, only_select=False):
283
284        # For the boot vdisk(first vdisk) of the instance, choose any
285        # reflection target other than this. For the data disks,
286        # retain the reflection target.
287        # It will be passed by the caller after reading it from instance
288        # metadata.
289
290        LOG.debug("_select_rt ")
291        rt_key = self._get_volume_metadata_value(metadata,
292                                                 'Secondary_datanode_key')
293        rt_dn_ip = self._get_volume_metadata_value(metadata,
294                                                   'Secondary_datanode_ip')
295        current_dn_ip = self._get_volume_metadata_value(metadata,
296                                                        'current_dn_ip')
297
298        if current_dn_ip is not None and rt_dn_ip == current_dn_ip:
299            return None, None
300
301        if rt_key is not None and rt_dn_ip is not None:
302            return rt_key, rt_dn_ip
303
304        rt_key = 'NA'
305        rt_dn_ip = 'NA'
306        datanodes = self._get_datanodes_info()
307        LOG.debug("Data nodes: %s", datanodes)
308
309        for key, value in datanodes.items():
310            if value['personality'] == 'datanode':
311                if self.my_dnid != value['hypervisor_id']:
312                    LOG.debug("reflection target hypervisor_id: %s",
313                              value['hypervisor_id'])
314                    LOG.debug("my hypervisor_id: %s", self.my_dnid)
315                    rt_dn_ip = value['data_ip']
316                    rt_key = value['hypervisor_id']
317
318        if only_select:
319            return rt_key, rt_dn_ip
320
321        return rt_key, rt_dn_ip
322
323    def _create_replica(self, volume, metadata):
324        """Create vdisk on peer data node."""
325
326        try:
327            reflection_target_ip = None
328            rt_routing_key, reflection_target_ip = (
329                self._select_rt(volume, metadata))
330            LOG.debug("_create_replica %(rt_key)s %(rt_ip)s",
331                      {"rt_key": rt_routing_key,
332                       "rt_ip": reflection_target_ip})
333
334            metadata_update = {}
335            metadata_update['Secondary_datanode_key'] = rt_routing_key
336            metadata_update['Secondary_datanode_ip'] = reflection_target_ip
337
338            if rt_routing_key is None or rt_routing_key == 'NA':
339                return False, None, metadata_update
340
341            instance_id = self._get_volume_metadata_value(metadata,
342                                                          'InstanceId')
343
344            util.message_data_plane(
345                rt_routing_key,
346                'hyperscale.storage.dm.volume.create',
347                pool_name=POOL_NAME,
348                volume_guid=util.get_guid_with_curly_brackets(
349                    volume['id']),
350                display_name=util.get_guid_with_curly_brackets(
351                    volume['id']),
352                volume_raw_size=volume['size'],
353                vm_id=util.get_guid_with_curly_brackets(
354                    six.text_type(instance_id)),
355                is_reflection_source=0,
356                dn_reflection_factor=1,
357                reflection_src_ip=self.datanode_ip,
358                user_id=util.get_guid_with_curly_brackets(
359                    volume['user_id']),
360                project_id=util.get_guid_with_curly_brackets(
361                    volume['project_id']),
362                volume_qos=1)
363            # Failure handling TBD.
364            ret = True
365            LOG.debug("Create volume sent to reflection target data node")
366
367        except (exception.VolumeNotFound,
368                exception.UnableToProcessHyperScaleCmdOutput,
369                exception.ErrorInSendingMsg):
370            LOG.error("Exception in creating replica", exc_info = True)
371            metadata_update['Secondary_datanode_key'] = 'NA'
372            metadata_update['Secondary_datanode_ip'] = 'NA'
373            metadata_update['DN_Resiliency'] = 'degraded'
374            ret = False
375        return ret, reflection_target_ip, metadata_update
376
377    def _get_volume_details_for_create_volume(self,
378                                              reflection_target_ip,
379                                              volume,
380                                              metadata):
381
382        instance_id = self._get_volume_metadata_value(metadata,
383                                                      'InstanceId')
384        volume_details = {}
385        volume_details['pool_name'] = POOL_NAME
386        volume_details['volume_guid'] = (
387            util.get_guid_with_curly_brackets(volume['id']))
388        volume_details['display_name'] = (
389            util.get_guid_with_curly_brackets(volume['id']))
390        volume_details['volume_raw_size'] = volume['size']
391        volume_details['vm_id'] = util.get_guid_with_curly_brackets(
392            six.text_type(instance_id))
393        volume_details['user_id'] = util.get_guid_with_curly_brackets(
394            volume['user_id'])
395        volume_details['project_id'] = (
396            util.get_guid_with_curly_brackets(volume['project_id']))
397        volume_details['volume_qos'] = 1
398        volume_details['dn_reflection_factor'] = 0
399
400        if reflection_target_ip is not None:
401            volume_details['is_reflection_source'] = 1
402            volume_details['dn_reflection_factor'] = 1
403            volume_details['reflection_target_ip'] = reflection_target_ip
404
405        return volume_details
406
407    def _get_volume_metadata(self, volume):
408        volume_metadata = {}
409        if 'volume_metadata' in volume:
410            for metadata in volume['volume_metadata']:
411                volume_metadata[metadata['key']] = metadata['value']
412        return volume_metadata
413
414    def _get_volume_metadata_value(self, metadata, metadata_key):
415        metadata_value = None
416        if metadata:
417            metadata_value = metadata.get(metadata_key)
418
419        LOG.debug("Volume metadata key %(m_key)s, value %(m_val)s",
420                  {"m_key": metadata_key, "m_val": metadata_value})
421        return metadata_value
422
423    @utils.trace_method
424    def create_volume(self, volume):
425        """Creates a hyperscale volume."""
426
427        model_update = {}
428        metadata_update = {}
429        reflection_target_ip = None
430        LOG.debug("Create volume")
431        try:
432            volume_metadata = self._get_volume_metadata(volume)
433
434            # 1. Check how many replicas needs to be created.
435            replicas = self._get_replicas(volume, volume_metadata)
436            if replicas > 1:
437                # 2. Create replica on peer datanode.
438                LOG.debug("Create volume message sent to peer data node")
439                ret, reflection_target_ip, metadata_update = (
440                    self._create_replica(volume, volume_metadata))
441                if ret is False:
442                    metadata_update['DN_Resiliency'] = 'degraded'
443                    # Do not fail volume creation, just create one replica.
444                    reflection_target_ip = None
445
446            # 3. Get volume details based on reflection factor
447            #    for volume
448            volume_details = self._get_volume_details_for_create_volume(
449                reflection_target_ip, volume, volume_metadata)
450
451            # 4. Send create volume to data node with volume details
452            util.message_data_plane(
453                self.dn_routing_key,
454                'hyperscale.storage.dm.volume.create',
455                **volume_details)
456            LOG.debug("Create volume message sent to data node")
457
458            volume_metadata['Primary_datanode_ip'] = self.datanode_ip
459            volume_metadata['current_dn_owner'] = self.dn_routing_key
460            volume_metadata['current_dn_ip'] = self.datanode_ip
461            volume_metadata['hs_image_id'] = util.get_hyperscale_image_id()
462            volume_metadata.update(metadata_update)
463
464            volume['provider_location'] = PROVIDER_LOCATION
465            model_update = {'provider_location': volume['provider_location'],
466                            'metadata': volume_metadata}
467
468        except (exception.UnableToProcessHyperScaleCmdOutput,
469                exception.ErrorInSendingMsg):
470            with excutils.save_and_reraise_exception():
471                LOG.exception('Unable to create hyperscale volume')
472
473        return model_update
474
475    @utils.trace_method
476    def delete_volume(self, volume):
477        """Deletes a volume."""
478
479        LOG.debug("Delete volume with id %s", volume['id'])
480        # 1. Check for provider location
481        if not volume['provider_location']:
482            LOG.warning('Volume %s does not have provider_location specified',
483                        volume['name'])
484            raise exception.VolumeMetadataNotFound(
485                volume_id=volume['id'],
486                metadata_key='provider_location')
487
488        # 2. Message data plane for volume deletion
489        message_body = {'display_name': volume['name']}
490
491        # if Secondary_datanode_key is present,
492        # delete the replica from secondary datanode.
493        rt_key = None
494
495        # Get metadata for volume
496        metadata = self._get_volume_metadata(volume)
497
498        rt_key = self._get_volume_metadata_value(metadata,
499                                                 'Secondary_datanode_key')
500        rt_dn_ip = self._get_volume_metadata_value(metadata,
501                                                   'Secondary_datanode_ip')
502        current_dn_ip = self._get_volume_metadata_value(metadata,
503                                                        'current_dn_ip')
504        if current_dn_ip is not None and rt_dn_ip == current_dn_ip:
505            rt_key = None
506
507        # Send Delete Volume to Data Node
508        try:
509            if rt_key is not None:
510                util.message_data_plane(
511                    rt_key,
512                    'hyperscale.storage.dm.volume.delete',
513                    **message_body)
514
515            util.message_data_plane(
516                self.dn_routing_key,
517                'hyperscale.storage.dm.volume.delete',
518                **message_body)
519
520        except (exception.UnableToProcessHyperScaleCmdOutput,
521                exception.ErrorInSendingMsg):
522            LOG.error('Exception while deleting volume', exc_info=True)
523            raise exception.VolumeIsBusy(volume_name=volume['name'])
524
525    @utils.trace_method
526    def create_snapshot(self, snapshot):
527        """Create a snapshot."""
528
529        LOG.debug("Create Snapshot %s", snapshot['volume_id'])
530        workflow_id = None
531        last_in_eds_seq = None
532        model_update = {}
533        rt_key = None
534
535        # Get metadata for volume
536        snapshot_volume = snapshot.get('volume')
537        metadata = snapshot_volume['metadata']
538        rt_key = self._get_volume_metadata_value(metadata,
539                                                 'current_dn_owner')
540        if rt_key is None:
541            rt_key = self.dn_routing_key
542
543        # Check for episodic based on metadata key
544        workflow_snap = 0
545
546        meta = snapshot.get('metadata')
547        LOG.debug('Snapshot metatadata %s', meta)
548        if 'SNAPSHOT-COOKIE' in meta.keys():
549            snapsize = meta['SIZE']
550
551            # Call DataNode for episodic snapshots
552            LOG.debug('Calling Data Node for episodic snapshots')
553            message_body = {}
554            message_body['snapshot_id'] = (
555                util.get_guid_with_curly_brackets(snapshot['id']))
556            message_body['volume_guid'] = (
557                util.get_guid_with_curly_brackets(
558                    snapshot['volume_id']))
559            message_body['snapshot_cookie'] = meta['SNAPSHOT-COOKIE']
560
561            try:
562                # send message to data node
563                util.message_data_plane(
564                    rt_key,
565                    'hyperscale.storage.dm.volume.snapshot.update',
566                    **message_body)
567
568                # Update size via cinder api
569                if snapsize is not None:
570                    model_update['volume_size'] = snapsize.value
571
572                # Set the episodic type metatdata for filtering purpose
573                meta['TYPE'] = TYPE_EPISODIC_SNAP
574                meta['status'] = 'available'
575                meta['datanode_ip'] = self.datanode_ip
576
577            except (exception.VolumeNotFound,
578                    exception.UnableToExecuteHyperScaleCmd,
579                    exception.UnableToProcessHyperScaleCmdOutput):
580                with excutils.save_and_reraise_exception():
581                    LOG.exception('Exception in create snapshot')
582
583            model_update['metadata'] = meta
584            return model_update
585
586        else:
587            out_meta = util.episodic_snap(meta)
588            if out_meta.get('update'):
589                meta['TYPE'] = out_meta.get('TYPE')
590                meta['status'] = out_meta.get('status')
591                meta['datanode_ip'] = self.datanode_ip
592                model_update['metadata'] = meta
593                return model_update
594
595        if 'workflow_id' in meta.keys():
596            workflow_snap = 1
597            workflow_id = meta['workflow_id']
598
599        if 'monitor_snap' in meta.keys():
600            if int(meta['monitor_snap']) == constants.SNAP_RESTORE_RF:
601                last_in_eds_seq = 0
602            else:
603                last_in_eds_seq = 1
604
605        # If code falls through here then it mean its user initiated snapshots
606        try:
607            # Get metadata for volume
608            vsa_routing_key = None
609            snapshot_volume = snapshot.get('volume')
610            metadata = snapshot_volume['metadata']
611            LOG.debug('Calling Compute Node for user initiated snapshots')
612            vsa_ip = self._get_volume_metadata_value(metadata,
613                                                     'acting_vdisk_owner')
614            if vsa_ip is None:
615                vsa_ip = self._get_volume_metadata_value(metadata, 'vsa_ip')
616
617            LOG.debug("Create snap on compute vsa %s", vsa_ip)
618            if vsa_ip:
619                vsa_routing_key = vsa_ip.replace('.', '')
620
621            message_body = {}
622            # Set the parent volume id
623            message_body['vdisk_id_str'] = (
624                util.get_guid_with_curly_brackets(
625                    snapshot['volume_id']))
626            # Set the snapshot details
627            message_body['snapshot_id_str'] = (
628                util.get_guid_with_curly_brackets(snapshot['id']))
629            message_body['snapshot_name'] = snapshot['name']
630
631            if workflow_snap == 1:
632                message_body['workflow_snapshot'] = 1
633            else:
634                message_body['user_initiated'] = 1
635
636            if last_in_eds_seq is not None:
637                message_body['last_in_eds_seq'] = last_in_eds_seq
638
639            # send message to compute node
640            util.message_compute_plane(
641                vsa_routing_key,
642                'hyperscale.storage.nfs.volume.snapshot.create',
643                **message_body)
644
645            # Set the snapshot type to either workflow or user initiated
646            # snapshot in metatdata for filtering purpose
647            if workflow_snap:
648                LOG.debug('__help request for WORKFLOW snapshot')
649                meta['TYPE'] = TYPE_WORKFLOW_SNAP
650                meta['status'] = 'creating'
651                meta['datanode_ip'] = self.datanode_ip
652            else:
653                LOG.debug('__help request for MANUAL snapshot')
654                meta['TYPE'] = TYPE_USER_SNAP
655                meta['status'] = 'creating'
656                meta['datanode_ip'] = self.datanode_ip
657
658            if workflow_id is not None:
659                message_body = {}
660                message_body['workflow_id'] = workflow_id
661                message_body['skip_upto_sentinel'] = (
662                    'hyperscale.vdisk.failback.snapmark_sentinel')
663
664                # send message to controller node
665                util.message_controller(
666                    constants.HS_CONTROLLER_EXCH,
667                    'hyperscale.controller.execute.workflow',
668                    **message_body)
669
670        except (exception.VolumeNotFound,
671                exception.UnableToExecuteHyperScaleCmd,
672                exception.UnableToProcessHyperScaleCmdOutput):
673            with excutils.save_and_reraise_exception():
674                LOG.exception('Exception in create snapshot')
675
676        model_update['metadata'] = meta
677        return model_update
678
679    @utils.trace_method
680    def delete_snapshot(self, snapshot):
681        """Deletes a snapshot."""
682
683        meta = snapshot.get('metadata')
684        if 'force' in meta.keys():
685            LOG.debug("Found force flag for snapshot metadata."
686                      " Not sending call to datanode ")
687            LOG.debug('snapshot metadata %s', meta)
688            return
689
690        if 'is_busy' in meta.keys():
691            LOG.warning("Snapshot %s is being used, skipping delete",
692                        snapshot['id'])
693            raise exception.SnapshotIsBusy(snapshot_name=snapshot['id'])
694        else:
695            LOG.warning("Snapshot %s is being deleted,"
696                        " is_busy key not present", snapshot['id'])
697
698        message_body = {}
699        message_body['volume_guid'] = (
700            util.get_guid_with_curly_brackets(snapshot['volume_id']))
701        message_body['snapshot_id'] = (
702            util.get_guid_with_curly_brackets(snapshot['id']))
703
704        # HyperScale snapshots whether Episodic or User initiated, all resides
705        # in the data plane.
706        # Hence delete snapshot operation will go to datanode
707        rt_key = None
708
709        # Get metadata for volume
710        snapshot_volume = snapshot.get('volume')
711        metadata = snapshot_volume['metadata']
712        rt_key = self._get_volume_metadata_value(metadata,
713                                                 'current_dn_owner')
714        if rt_key is None:
715            rt_key = self.dn_routing_key
716
717        try:
718            # send message to data node
719            util.message_data_plane(
720                rt_key,
721                'hyperscale.storage.dm.version.delete',
722                **message_body)
723
724        except (exception.UnableToExecuteHyperScaleCmd,
725                exception.UnableToProcessHyperScaleCmdOutput):
726            with excutils.save_and_reraise_exception():
727                LOG.exception('Exception in delete snapshot')
728
729    @utils.trace_method
730    def create_volume_from_snapshot(self, volume, snapshot):
731        """Create volume from snapshot."""
732
733        LOG.debug("Create volume from snapshot")
734        model_update = {}
735        try:
736            LOG.debug("Clone new volume %(t_id)s from snapshot with id"
737                      " %(s_id)s", {"t_id": volume['id'],
738                                    "s_id": volume['snapshot_id']})
739            # 1. Make a call to DN
740            # Check if current_dn_owner is set.
741            # Route the snapshot creation request to current_dn_owner
742
743            rt_key = None
744
745            # Get metadata for volume
746            snap_vol = snapshot['volume']
747            metadata = snap_vol['metadata']
748            rt_key = self._get_volume_metadata_value(metadata,
749                                                     'current_dn_owner')
750            if rt_key is None:
751                rt_key = self.dn_routing_key
752
753            util.message_data_plane(
754                rt_key,
755                'hyperscale.storage.dm.volume.clone.create',
756                pool_name=POOL_NAME,
757                display_name=util.get_guid_with_curly_brackets(
758                    volume['id']),
759                version_name=util.get_guid_with_curly_brackets(
760                    volume['snapshot_id']),
761                volume_raw_size=volume['size'],
762                volume_qos=1,
763                parent_volume_guid=util.get_guid_with_curly_brackets(
764                    snapshot['volume_id']),
765                user_id=util.get_guid_with_curly_brackets(
766                    volume['user_id']),
767                project_id=util.get_guid_with_curly_brackets(
768                    volume['project_id']),
769                volume_guid=util.get_guid_with_curly_brackets(
770                    volume['id']))
771
772            LOG.debug("Volume created successfully on data node")
773
774            # Get metadata for volume
775            volume_metadata = self._get_volume_metadata(volume)
776            parent_cur_dn = self._get_volume_metadata_value(metadata,
777                                                            'current_dn_ip')
778
779            metadata_update = {}
780            metadata_update['snapshot_id'] = snapshot['id']
781            metadata_update['parent_volume_guid'] = (
782                util.get_guid_with_curly_brackets(
783                    snapshot['volume_id']))
784            metadata_update['Primary_datanode_ip'] = parent_cur_dn
785            metadata_update['current_dn_owner'] = rt_key
786            metadata_update['current_dn_ip'] = parent_cur_dn
787
788            # 2. Choose a potential replica here.
789            # The actual decision to have potential replica is made in NOVA.
790            rt_key, rt_dn_ip = self._select_rt(volume,
791                                               volume_metadata,
792                                               only_select=True)
793
794            if rt_key and rt_dn_ip:
795                metadata_update['Potential_secondary_key'] = rt_key
796                metadata_update['Potential_secondary_ip'] = rt_dn_ip
797
798        except (exception.UnableToExecuteHyperScaleCmd,
799                exception.UnableToProcessHyperScaleCmdOutput):
800            with excutils.save_and_reraise_exception():
801                LOG.exception('Exception in creating volume from snapshot')
802        except exception.InvalidMetadataType:
803            with excutils.save_and_reraise_exception():
804                LOG.exception('Exception updating metadata in create'
805                              ' volume from snapshot')
806
807        volume_metadata.update(metadata_update)
808
809        volume['provider_location'] = PROVIDER_LOCATION
810        model_update = {'provider_location': volume['provider_location'],
811                        'metadata': volume_metadata}
812
813        return model_update
814
815    @utils.trace_method
816    def get_volume_stats(self, refresh=False):
817        """Get volume status."""
818
819        # If 'refresh' is True, run update the stats first.
820
821        LOG.debug("Get volume status")
822
823        self._stats = self._fetch_volume_status()
824        new_total = self._stats['total_capacity_gb']
825        new_free = self._stats['free_capacity_gb']
826
827        if self.old_total != new_total or self.old_free != new_free:
828            self.old_total = new_total
829            self.old_free = new_free
830
831            message_body = {'hostname': self.datanode_hostname,
832                            'is_admin': 1,
833                            'total': new_total,
834                            'free': new_free}
835            try:
836                cmd_out, cmd_error = util.message_controller(
837                    constants.HS_CONTROLLER_EXCH,
838                    'hyperscale.controller.set.datanode.storage.stats',
839                    **message_body)
840                LOG.debug("Response Message from Controller: %s",
841                          cmd_out)
842
843            except (exception.UnableToExecuteHyperScaleCmd,
844                    exception.UnableToProcessHyperScaleCmdOutput):
845                with excutils.save_and_reraise_exception():
846                    LOG.exception('Exception during fetch stats')
847
848        return self._stats
849
850    @utils.trace_method
851    def extend_volume(self, volume, size_gb):
852        """Extend volume."""
853
854        LOG.debug("Extend volume")
855        try:
856            message_body = {}
857            message_body['volume_guid'] = (
858                util.get_guid_with_curly_brackets(volume['id']))
859            message_body['new_size'] = size_gb
860
861            # Send Extend Volume message to Data Node
862            util.message_data_plane(
863                self.dn_routing_key,
864                'hyperscale.storage.dm.volume.extend',
865                **message_body)
866
867        except (exception.UnableToProcessHyperScaleCmdOutput,
868                exception.ErrorInSendingMsg):
869            msg = _('Exception in extend volume %s') % volume['name']
870            LOG.exception(msg)
871            raise exception.VolumeDriverException(message=msg)
872
873    def _fetch_volume_status(self):
874        """Retrieve Volume Stats from Datanode."""
875
876        LOG.debug("Request Volume Stats from Datanode")
877
878        data = {}
879        data["volume_backend_name"] = 'Veritas_HyperScale'
880        data["vendor_name"] = 'Veritas Technologies LLC'
881        data["driver_version"] = self.VERSION
882        data["storage_protocol"] = 'nfs'
883        data['total_capacity_gb'] = 0.0
884        data['free_capacity_gb'] = 0.0
885        data['reserved_percentage'] = self.configuration.reserved_percentage
886        data['QoS_support'] = False
887
888        try:
889            message_body = {}
890            # send message to data node
891            cmd_out, cmd_error = util.message_data_plane(
892                self.dn_routing_key,
893                'hyperscale.storage.dm.discover.stats',
894                **message_body)
895
896            LOG.debug("Response Message from Datanode: %s", cmd_out)
897            payload = cmd_out.get('payload')
898            if 'stats' in payload.keys():
899                if 'total_capacity' in payload.get(
900                        'stats')[0].keys():
901                    total_capacity = payload.get(
902                        'stats')[0]['total_capacity']
903
904                if 'free_capacity' in payload.get(
905                        'stats')[0].keys():
906                    free_capacity = payload.get(
907                        'stats')[0]['free_capacity']
908
909                if total_capacity is not None:
910                    data['total_capacity_gb'] = float(total_capacity)
911                    data['free_capacity_gb'] = float(free_capacity)
912
913        except (exception.UnableToExecuteHyperScaleCmd,
914                exception.UnableToProcessHyperScaleCmdOutput):
915            with excutils.save_and_reraise_exception():
916                LOG.exception('Exception during fetch stats')
917
918        return data
919
920    @utils.trace_method
921    def initialize_connection(self, volume, connector):
922        """Allow connection to connector and return connection info."""
923        data = {'export': volume['provider_location'],
924                'name': volume['name']}
925        return {
926            'driver_volume_type': 'veritas_hyperscale',
927            'data': data
928        }
929
930    def terminate_connection(self, volume, connector, **kwargs):
931        """Disallow connection from connector."""
932        pass
933
934    def ensure_export(self, ctx, volume):
935        """Synchronously recreates an export for a logical volume."""
936        pass
937
938    def create_export(self, ctx, volume, connector):
939
940        # Exports the volume. Can optionally return a Dictionary of changes
941        # to the volume object to be persisted."""
942        pass
943
944    def remove_export(self, ctx, volume):
945        """Removes an export for a logical volume."""
946        pass
947
948    @utils.trace_method
949    def copy_image_to_volume(self, context, volume, image_service, image_id):
950        """Fetch the image from image_service and write it to the volume."""
951
952        LOG.debug("copy_image_to_volume volume: %(vol)s "
953                  "image service: %(service)s image id: %(id)s.",
954                  {'vol': volume,
955                   'service': six.text_type(image_service),
956                   'id': six.text_type(image_id)})
957
958        path = util.get_image_path(image_id)
959        try:
960            # Skip image creation if file already exists
961            if not os.path.isfile(path):
962                image_utils.fetch_to_raw(context,
963                                         image_service,
964                                         image_id,
965                                         path,
966                                         BLOCK_SIZE,
967                                         size=volume['size'])
968            metadata = self._get_volume_metadata(volume)
969            hs_img_id = self._get_volume_metadata_value(metadata,
970                                                        'hs_image_id')
971            util.update_image(path, volume['id'], hs_img_id)
972
973        except (exception.UnableToExecuteHyperScaleCmd,
974                exception.UnableToProcessHyperScaleCmdOutput):
975            with excutils.save_and_reraise_exception():
976                LOG.exception('Failed to copy_image_to_volume')
977
978    @utils.trace_method
979    def copy_volume_to_image(self, context, volume, image_service, image_meta):
980        """Copy the volume to the specified image."""
981
982        LOG.debug("copy_volume_to_image volume: %(vol)s"
983                  " image service:%(service)s image meta: %(meta)s.",
984                  {'vol': volume,
985                   'service': six.text_type(image_service),
986                   'meta': six.text_type(image_meta)})
987
988        try:
989            metadata = self._get_volume_metadata(volume)
990            hs_img_id = self._get_volume_metadata_value(metadata,
991                                                        'hs_image_id')
992            path = util.get_image_path(hs_img_id, 'volume')
993            image_utils.upload_volume(context,
994                                      image_service,
995                                      image_meta,
996                                      path)
997
998        except (exception.UnableToExecuteHyperScaleCmd,
999                exception.UnableToProcessHyperScaleCmdOutput):
1000            with excutils.save_and_reraise_exception():
1001                LOG.exception('Failed to copy_volume_to_image')
1002