1# Copyright (c) 2012 OpenStack Foundation
2#
3#    Licensed under the Apache License, Version 2.0 (the "License"); you may
4#    not use this file except in compliance with the License. You may obtain
5#    a copy of the License at
6#
7#         http://www.apache.org/licenses/LICENSE-2.0
8#
9#    Unless required by applicable law or agreed to in writing, software
10#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12#    License for the specific language governing permissions and limitations
13#    under the License.
14
15"""Volume-related Utilities and helpers."""
16
17
18import ast
19import functools
20import json
21import math
22import operator
23from os import urandom
24import re
25import time
26import uuid
27
28from castellan.common.credentials import keystone_password
29from castellan.common import exception as castellan_exception
30from castellan import key_manager as castellan_key_manager
31import eventlet
32from eventlet import tpool
33from keystoneauth1 import loading as ks_loading
34from oslo_concurrency import processutils
35from oslo_config import cfg
36from oslo_log import log as logging
37from oslo_utils import strutils
38from oslo_utils import timeutils
39from oslo_utils import units
40from random import shuffle
41import six
42from six.moves import range
43
44from cinder.brick.local_dev import lvm as brick_lvm
45from cinder import context
46from cinder import db
47from cinder import exception
48from cinder.i18n import _
49from cinder import objects
50from cinder import rpc
51from cinder import utils
52from cinder.volume import group_types
53from cinder.volume import throttling
54from cinder.volume import volume_types
55
56
57CONF = cfg.CONF
58
59LOG = logging.getLogger(__name__)
60
61
62def null_safe_str(s):
63    return str(s) if s else ''
64
65
66def _usage_from_volume(context, volume_ref, **kw):
67    now = timeutils.utcnow()
68    launched_at = volume_ref['launched_at'] or now
69    created_at = volume_ref['created_at'] or now
70    volume_status = volume_ref['status']
71    if volume_status == 'error_managing_deleting':
72        volume_status = 'deleting'
73    usage_info = dict(
74        tenant_id=volume_ref['project_id'],
75        host=volume_ref['host'],
76        user_id=volume_ref['user_id'],
77        availability_zone=volume_ref['availability_zone'],
78        volume_id=volume_ref['id'],
79        volume_type=volume_ref['volume_type_id'],
80        display_name=volume_ref['display_name'],
81        launched_at=launched_at.isoformat(),
82        created_at=created_at.isoformat(),
83        status=volume_status,
84        snapshot_id=volume_ref['snapshot_id'],
85        size=volume_ref['size'],
86        replication_status=volume_ref['replication_status'],
87        replication_extended_status=volume_ref['replication_extended_status'],
88        replication_driver_data=volume_ref['replication_driver_data'],
89        metadata=volume_ref.get('volume_metadata'),)
90
91    usage_info.update(kw)
92    try:
93        attachments = db.volume_attachment_get_all_by_volume_id(
94            context, volume_ref['id'])
95        usage_info['volume_attachment'] = attachments
96
97        glance_meta = db.volume_glance_metadata_get(context, volume_ref['id'])
98        if glance_meta:
99            usage_info['glance_metadata'] = glance_meta
100    except exception.GlanceMetadataNotFound:
101        pass
102    except exception.VolumeNotFound:
103        LOG.debug("Can not find volume %s at notify usage", volume_ref['id'])
104
105    return usage_info
106
107
108def _usage_from_backup(backup, **kw):
109    num_dependent_backups = backup.num_dependent_backups
110    usage_info = dict(tenant_id=backup.project_id,
111                      user_id=backup.user_id,
112                      availability_zone=backup.availability_zone,
113                      backup_id=backup.id,
114                      host=backup.host,
115                      display_name=backup.display_name,
116                      created_at=backup.created_at.isoformat(),
117                      status=backup.status,
118                      volume_id=backup.volume_id,
119                      size=backup.size,
120                      service_metadata=backup.service_metadata,
121                      service=backup.service,
122                      fail_reason=backup.fail_reason,
123                      parent_id=backup.parent_id,
124                      num_dependent_backups=num_dependent_backups,
125                      snapshot_id=backup.snapshot_id,
126                      )
127
128    usage_info.update(kw)
129    return usage_info
130
131
132@utils.if_notifications_enabled
133def notify_about_volume_usage(context, volume, event_suffix,
134                              extra_usage_info=None, host=None):
135    if not host:
136        host = CONF.host
137
138    if not extra_usage_info:
139        extra_usage_info = {}
140
141    usage_info = _usage_from_volume(context, volume, **extra_usage_info)
142
143    rpc.get_notifier("volume", host).info(context, 'volume.%s' % event_suffix,
144                                          usage_info)
145
146
147@utils.if_notifications_enabled
148def notify_about_backup_usage(context, backup, event_suffix,
149                              extra_usage_info=None,
150                              host=None):
151    if not host:
152        host = CONF.host
153
154    if not extra_usage_info:
155        extra_usage_info = {}
156
157    usage_info = _usage_from_backup(backup, **extra_usage_info)
158
159    rpc.get_notifier("backup", host).info(context, 'backup.%s' % event_suffix,
160                                          usage_info)
161
162
163def _usage_from_snapshot(snapshot, context, **extra_usage_info):
164    # (niedbalski) a snapshot might be related to a deleted
165    # volume, if that's the case, the volume information is still
166    # required for filling the usage_info, so we enforce to read
167    # the volume data even if the volume has been deleted.
168    context.read_deleted = "yes"
169    volume = db.volume_get(context, snapshot.volume_id)
170    usage_info = {
171        'tenant_id': snapshot.project_id,
172        'user_id': snapshot.user_id,
173        'availability_zone': volume['availability_zone'],
174        'volume_id': snapshot.volume_id,
175        'volume_size': snapshot.volume_size,
176        'snapshot_id': snapshot.id,
177        'display_name': snapshot.display_name,
178        'created_at': snapshot.created_at.isoformat(),
179        'status': snapshot.status,
180        'deleted': null_safe_str(snapshot.deleted),
181        'metadata': null_safe_str(snapshot.metadata),
182    }
183
184    usage_info.update(extra_usage_info)
185    return usage_info
186
187
188@utils.if_notifications_enabled
189def notify_about_snapshot_usage(context, snapshot, event_suffix,
190                                extra_usage_info=None, host=None):
191    if not host:
192        host = CONF.host
193
194    if not extra_usage_info:
195        extra_usage_info = {}
196
197    usage_info = _usage_from_snapshot(snapshot, context, **extra_usage_info)
198
199    rpc.get_notifier('snapshot', host).info(context,
200                                            'snapshot.%s' % event_suffix,
201                                            usage_info)
202
203
204def _usage_from_capacity(capacity, **extra_usage_info):
205
206    capacity_info = {
207        'name_to_id': capacity['name_to_id'],
208        'total': capacity['total'],
209        'free': capacity['free'],
210        'allocated': capacity['allocated'],
211        'provisioned': capacity['provisioned'],
212        'virtual_free': capacity['virtual_free'],
213        'reported_at': capacity['reported_at']
214    }
215
216    capacity_info.update(extra_usage_info)
217    return capacity_info
218
219
220@utils.if_notifications_enabled
221def notify_about_capacity_usage(context, capacity, suffix,
222                                extra_usage_info=None, host=None):
223    if not host:
224        host = CONF.host
225
226    if not extra_usage_info:
227        extra_usage_info = {}
228
229    usage_info = _usage_from_capacity(capacity, **extra_usage_info)
230
231    rpc.get_notifier('capacity', host).info(context,
232                                            'capacity.%s' % suffix,
233                                            usage_info)
234
235
236@utils.if_notifications_enabled
237def notify_about_replication_usage(context, volume, suffix,
238                                   extra_usage_info=None, host=None):
239    if not host:
240        host = CONF.host
241
242    if not extra_usage_info:
243        extra_usage_info = {}
244
245    usage_info = _usage_from_volume(context, volume,
246                                    **extra_usage_info)
247
248    rpc.get_notifier('replication', host).info(context,
249                                               'replication.%s' % suffix,
250                                               usage_info)
251
252
253@utils.if_notifications_enabled
254def notify_about_replication_error(context, volume, suffix,
255                                   extra_error_info=None, host=None):
256    if not host:
257        host = CONF.host
258
259    if not extra_error_info:
260        extra_error_info = {}
261
262    usage_info = _usage_from_volume(context, volume,
263                                    **extra_error_info)
264
265    rpc.get_notifier('replication', host).error(context,
266                                                'replication.%s' % suffix,
267                                                usage_info)
268
269
270def _usage_from_consistencygroup(group_ref, **kw):
271    usage_info = dict(tenant_id=group_ref.project_id,
272                      user_id=group_ref.user_id,
273                      availability_zone=group_ref.availability_zone,
274                      consistencygroup_id=group_ref.id,
275                      name=group_ref.name,
276                      created_at=group_ref.created_at.isoformat(),
277                      status=group_ref.status)
278
279    usage_info.update(kw)
280    return usage_info
281
282
283@utils.if_notifications_enabled
284def notify_about_consistencygroup_usage(context, group, event_suffix,
285                                        extra_usage_info=None, host=None):
286    if not host:
287        host = CONF.host
288
289    if not extra_usage_info:
290        extra_usage_info = {}
291
292    usage_info = _usage_from_consistencygroup(group,
293                                              **extra_usage_info)
294
295    rpc.get_notifier("consistencygroup", host).info(
296        context,
297        'consistencygroup.%s' % event_suffix,
298        usage_info)
299
300
301def _usage_from_group(group_ref, **kw):
302    usage_info = dict(tenant_id=group_ref.project_id,
303                      user_id=group_ref.user_id,
304                      availability_zone=group_ref.availability_zone,
305                      group_id=group_ref.id,
306                      group_type=group_ref.group_type_id,
307                      name=group_ref.name,
308                      created_at=group_ref.created_at.isoformat(),
309                      status=group_ref.status)
310
311    usage_info.update(kw)
312    return usage_info
313
314
315@utils.if_notifications_enabled
316def notify_about_group_usage(context, group, event_suffix,
317                             extra_usage_info=None, host=None):
318    if not host:
319        host = CONF.host
320
321    if not extra_usage_info:
322        extra_usage_info = {}
323
324    usage_info = _usage_from_group(group,
325                                   **extra_usage_info)
326
327    rpc.get_notifier("group", host).info(
328        context,
329        'group.%s' % event_suffix,
330        usage_info)
331
332
333def _usage_from_cgsnapshot(cgsnapshot, **kw):
334    usage_info = dict(
335        tenant_id=cgsnapshot.project_id,
336        user_id=cgsnapshot.user_id,
337        cgsnapshot_id=cgsnapshot.id,
338        name=cgsnapshot.name,
339        consistencygroup_id=cgsnapshot.consistencygroup_id,
340        created_at=cgsnapshot.created_at.isoformat(),
341        status=cgsnapshot.status)
342
343    usage_info.update(kw)
344    return usage_info
345
346
347def _usage_from_group_snapshot(group_snapshot, **kw):
348    usage_info = dict(
349        tenant_id=group_snapshot.project_id,
350        user_id=group_snapshot.user_id,
351        group_snapshot_id=group_snapshot.id,
352        name=group_snapshot.name,
353        group_id=group_snapshot.group_id,
354        group_type=group_snapshot.group_type_id,
355        created_at=group_snapshot.created_at.isoformat(),
356        status=group_snapshot.status)
357
358    usage_info.update(kw)
359    return usage_info
360
361
362@utils.if_notifications_enabled
363def notify_about_cgsnapshot_usage(context, cgsnapshot, event_suffix,
364                                  extra_usage_info=None, host=None):
365    if not host:
366        host = CONF.host
367
368    if not extra_usage_info:
369        extra_usage_info = {}
370
371    usage_info = _usage_from_cgsnapshot(cgsnapshot,
372                                        **extra_usage_info)
373
374    rpc.get_notifier("cgsnapshot", host).info(
375        context,
376        'cgsnapshot.%s' % event_suffix,
377        usage_info)
378
379
380@utils.if_notifications_enabled
381def notify_about_group_snapshot_usage(context, group_snapshot, event_suffix,
382                                      extra_usage_info=None, host=None):
383    if not host:
384        host = CONF.host
385
386    if not extra_usage_info:
387        extra_usage_info = {}
388
389    usage_info = _usage_from_group_snapshot(group_snapshot,
390                                            **extra_usage_info)
391
392    rpc.get_notifier("group_snapshot", host).info(
393        context,
394        'group_snapshot.%s' % event_suffix,
395        usage_info)
396
397
398def _check_blocksize(blocksize):
399
400    # Check if volume_dd_blocksize is valid
401    try:
402        # Rule out zero-sized/negative/float dd blocksize which
403        # cannot be caught by strutils
404        if blocksize.startswith(('-', '0')) or '.' in blocksize:
405            raise ValueError
406        strutils.string_to_bytes('%sB' % blocksize)
407    except ValueError:
408        LOG.warning("Incorrect value error: %(blocksize)s, "
409                    "it may indicate that \'volume_dd_blocksize\' "
410                    "was configured incorrectly. Fall back to default.",
411                    {'blocksize': blocksize})
412        # Fall back to default blocksize
413        CONF.clear_override('volume_dd_blocksize')
414        blocksize = CONF.volume_dd_blocksize
415
416    return blocksize
417
418
419def check_for_odirect_support(src, dest, flag='oflag=direct'):
420
421    # Check whether O_DIRECT is supported
422    try:
423        # iflag=direct and if=/dev/zero combination does not work
424        # error: dd: failed to open '/dev/zero': Invalid argument
425        if (src == '/dev/zero' and flag == 'iflag=direct'):
426            return False
427        else:
428            utils.execute('dd', 'count=0', 'if=%s' % src,
429                          'of=%s' % dest,
430                          flag, run_as_root=True)
431            return True
432    except processutils.ProcessExecutionError:
433        return False
434
435
436def _copy_volume_with_path(prefix, srcstr, deststr, size_in_m, blocksize,
437                           sync=False, execute=utils.execute, ionice=None,
438                           sparse=False):
439    cmd = prefix[:]
440
441    if ionice:
442        cmd.extend(('ionice', ionice))
443
444    blocksize = _check_blocksize(blocksize)
445    size_in_bytes = size_in_m * units.Mi
446
447    cmd.extend(('dd', 'if=%s' % srcstr, 'of=%s' % deststr,
448                'count=%d' % size_in_bytes, 'bs=%s' % blocksize))
449
450    # Use O_DIRECT to avoid thrashing the system buffer cache
451    odirect = check_for_odirect_support(srcstr, deststr, 'iflag=direct')
452
453    cmd.append('iflag=count_bytes,direct' if odirect else 'iflag=count_bytes')
454
455    if check_for_odirect_support(srcstr, deststr, 'oflag=direct'):
456        cmd.append('oflag=direct')
457        odirect = True
458
459    # If the volume is being unprovisioned then
460    # request the data is persisted before returning,
461    # so that it's not discarded from the cache.
462    conv = []
463    if sync and not odirect:
464        conv.append('fdatasync')
465    if sparse:
466        conv.append('sparse')
467    if conv:
468        conv_options = 'conv=' + ",".join(conv)
469        cmd.append(conv_options)
470
471    # Perform the copy
472    start_time = timeutils.utcnow()
473    execute(*cmd, run_as_root=True)
474    duration = timeutils.delta_seconds(start_time, timeutils.utcnow())
475
476    # NOTE(jdg): use a default of 1, mostly for unit test, but in
477    # some incredible event this is 0 (cirros image?) don't barf
478    if duration < 1:
479        duration = 1
480    mbps = (size_in_m / duration)
481    LOG.debug("Volume copy details: src %(src)s, dest %(dest)s, "
482              "size %(sz).2f MB, duration %(duration).2f sec",
483              {"src": srcstr,
484               "dest": deststr,
485               "sz": size_in_m,
486               "duration": duration})
487    LOG.info("Volume copy %(size_in_m).2f MB at %(mbps).2f MB/s",
488             {'size_in_m': size_in_m, 'mbps': mbps})
489
490
491def _open_volume_with_path(path, mode):
492    try:
493        with utils.temporary_chown(path):
494            handle = open(path, mode)
495            return handle
496    except Exception:
497        LOG.error("Failed to open volume from %(path)s.", {'path': path})
498
499
500def _transfer_data(src, dest, length, chunk_size):
501    """Transfer data between files (Python IO objects)."""
502
503    chunks = int(math.ceil(length / chunk_size))
504    remaining_length = length
505
506    LOG.debug("%(chunks)s chunks of %(bytes)s bytes to be transferred.",
507              {'chunks': chunks, 'bytes': chunk_size})
508
509    for chunk in range(0, chunks):
510        before = time.time()
511        data = tpool.execute(src.read, min(chunk_size, remaining_length))
512
513        # If we have reached end of source, discard any extraneous bytes from
514        # destination volume if trim is enabled and stop writing.
515        if data == b'':
516            break
517
518        tpool.execute(dest.write, data)
519        remaining_length -= len(data)
520        delta = (time.time() - before)
521        rate = (chunk_size / delta) / units.Ki
522        LOG.debug("Transferred chunk %(chunk)s of %(chunks)s (%(rate)dK/s).",
523                  {'chunk': chunk + 1, 'chunks': chunks, 'rate': rate})
524
525        # yield to any other pending operations
526        eventlet.sleep(0)
527
528    tpool.execute(dest.flush)
529
530
531def _copy_volume_with_file(src, dest, size_in_m):
532    src_handle = src
533    if isinstance(src, six.string_types):
534        src_handle = _open_volume_with_path(src, 'rb')
535
536    dest_handle = dest
537    if isinstance(dest, six.string_types):
538        dest_handle = _open_volume_with_path(dest, 'wb')
539
540    if not src_handle:
541        raise exception.DeviceUnavailable(
542            _("Failed to copy volume, source device unavailable."))
543
544    if not dest_handle:
545        raise exception.DeviceUnavailable(
546            _("Failed to copy volume, destination device unavailable."))
547
548    start_time = timeutils.utcnow()
549
550    _transfer_data(src_handle, dest_handle, size_in_m * units.Mi, units.Mi * 4)
551
552    duration = max(1, timeutils.delta_seconds(start_time, timeutils.utcnow()))
553
554    if isinstance(src, six.string_types):
555        src_handle.close()
556    if isinstance(dest, six.string_types):
557        dest_handle.close()
558
559    mbps = (size_in_m / duration)
560    LOG.info("Volume copy completed (%(size_in_m).2f MB at "
561             "%(mbps).2f MB/s).",
562             {'size_in_m': size_in_m, 'mbps': mbps})
563
564
565def copy_volume(src, dest, size_in_m, blocksize, sync=False,
566                execute=utils.execute, ionice=None, throttle=None,
567                sparse=False):
568    """Copy data from the source volume to the destination volume.
569
570    The parameters 'src' and 'dest' are both typically of type str, which
571    represents the path to each volume on the filesystem.  Connectors can
572    optionally return a volume handle of type RawIOBase for volumes that are
573    not available on the local filesystem for open/close operations.
574
575    If either 'src' or 'dest' are not of type str, then they are assumed to be
576    of type RawIOBase or any derivative that supports file operations such as
577    read and write.  In this case, the handles are treated as file handles
578    instead of file paths and, at present moment, throttling is unavailable.
579    """
580
581    if (isinstance(src, six.string_types) and
582            isinstance(dest, six.string_types)):
583        if not throttle:
584            throttle = throttling.Throttle.get_default()
585        with throttle.subcommand(src, dest) as throttle_cmd:
586            _copy_volume_with_path(throttle_cmd['prefix'], src, dest,
587                                   size_in_m, blocksize, sync=sync,
588                                   execute=execute, ionice=ionice,
589                                   sparse=sparse)
590    else:
591        _copy_volume_with_file(src, dest, size_in_m)
592
593
594def clear_volume(volume_size, volume_path, volume_clear=None,
595                 volume_clear_size=None, volume_clear_ionice=None,
596                 throttle=None):
597    """Unprovision old volumes to prevent data leaking between users."""
598    if volume_clear is None:
599        volume_clear = CONF.volume_clear
600
601    if volume_clear_size is None:
602        volume_clear_size = CONF.volume_clear_size
603
604    if volume_clear_size == 0:
605        volume_clear_size = volume_size
606
607    if volume_clear_ionice is None:
608        volume_clear_ionice = CONF.volume_clear_ionice
609
610    LOG.info("Performing secure delete on volume: %s", volume_path)
611
612    # We pass sparse=False explicitly here so that zero blocks are not
613    # skipped in order to clear the volume.
614    if volume_clear == 'zero':
615        return copy_volume('/dev/zero', volume_path, volume_clear_size,
616                           CONF.volume_dd_blocksize,
617                           sync=True, execute=utils.execute,
618                           ionice=volume_clear_ionice,
619                           throttle=throttle, sparse=False)
620    else:
621        raise exception.InvalidConfigurationValue(
622            option='volume_clear',
623            value=volume_clear)
624
625
626def supports_thin_provisioning():
627    return brick_lvm.LVM.supports_thin_provisioning(
628        utils.get_root_helper())
629
630
631def get_all_physical_volumes(vg_name=None):
632    return brick_lvm.LVM.get_all_physical_volumes(
633        utils.get_root_helper(),
634        vg_name)
635
636
637def get_all_volume_groups(vg_name=None):
638    return brick_lvm.LVM.get_all_volume_groups(
639        utils.get_root_helper(),
640        vg_name)
641
642# Default symbols to use for passwords. Avoids visually confusing characters.
643# ~6 bits per symbol
644DEFAULT_PASSWORD_SYMBOLS = ('23456789',  # Removed: 0,1
645                            'ABCDEFGHJKLMNPQRSTUVWXYZ',   # Removed: I, O
646                            'abcdefghijkmnopqrstuvwxyz')  # Removed: l
647
648
649def generate_password(length=16, symbolgroups=DEFAULT_PASSWORD_SYMBOLS):
650    """Generate a random password from the supplied symbol groups.
651
652    At least one symbol from each group will be included. Unpredictable
653    results if length is less than the number of symbol groups.
654
655    Believed to be reasonably secure (with a reasonable password length!)
656
657    """
658    # NOTE(jerdfelt): Some password policies require at least one character
659    # from each group of symbols, so start off with one random character
660    # from each symbol group
661
662    bytes = 1  # Number of random bytes to generate for each choice
663
664    password = [s[ord(urandom(bytes)) % len(s)]
665                for s in symbolgroups]
666    # If length < len(symbolgroups), the leading characters will only
667    # be from the first length groups. Try our best to not be predictable
668    # by shuffling and then truncating.
669    shuffle(password)
670    password = password[:length]
671    length -= len(password)
672
673    # then fill with random characters from all symbol groups
674    symbols = ''.join(symbolgroups)
675    password.extend(
676        [symbols[ord(urandom(bytes)) % len(symbols)]
677            for _i in range(length)])
678
679    # finally shuffle to ensure first x characters aren't from a
680    # predictable group
681    shuffle(password)
682
683    return ''.join(password)
684
685
686def generate_username(length=20, symbolgroups=DEFAULT_PASSWORD_SYMBOLS):
687    # Use the same implementation as the password generation.
688    return generate_password(length, symbolgroups)
689
690
691DEFAULT_POOL_NAME = '_pool0'
692
693
694def extract_host(host, level='backend', default_pool_name=False):
695    """Extract Host, Backend or Pool information from host string.
696
697    :param host: String for host, which could include host@backend#pool info
698    :param level: Indicate which level of information should be extracted
699                  from host string. Level can be 'host', 'backend' or 'pool',
700                  default value is 'backend'
701    :param default_pool_name: this flag specify what to do if level == 'pool'
702                              and there is no 'pool' info encoded in host
703                              string.  default_pool_name=True will return
704                              DEFAULT_POOL_NAME, otherwise we return None.
705                              Default value of this parameter is False.
706    :return: expected information, string or None
707    :raises: exception.InvalidVolume
708
709    For example:
710        host = 'HostA@BackendB#PoolC'
711        ret = extract_host(host, 'host')
712        # ret is 'HostA'
713        ret = extract_host(host, 'backend')
714        # ret is 'HostA@BackendB'
715        ret = extract_host(host, 'pool')
716        # ret is 'PoolC'
717
718        host = 'HostX@BackendY'
719        ret = extract_host(host, 'pool')
720        # ret is None
721        ret = extract_host(host, 'pool', True)
722        # ret is '_pool0'
723    """
724
725    if host is None:
726        msg = _("volume is not assigned to a host")
727        raise exception.InvalidVolume(reason=msg)
728
729    if level == 'host':
730        # make sure pool is not included
731        hst = host.split('#')[0]
732        return hst.split('@')[0]
733    elif level == 'backend':
734        return host.split('#')[0]
735    elif level == 'pool':
736        lst = host.split('#')
737        if len(lst) == 2:
738            return lst[1]
739        elif default_pool_name is True:
740            return DEFAULT_POOL_NAME
741        else:
742            return None
743
744
745def append_host(host, pool):
746    """Encode pool into host info."""
747    if not host or not pool:
748        return host
749
750    new_host = "#".join([host, pool])
751    return new_host
752
753
754def matching_backend_name(src_volume_type, volume_type):
755    if src_volume_type.get('volume_backend_name') and \
756            volume_type.get('volume_backend_name'):
757        return src_volume_type.get('volume_backend_name') == \
758            volume_type.get('volume_backend_name')
759    else:
760        return False
761
762
763def hosts_are_equivalent(host_1, host_2):
764    # In case host_1 or host_2 are None
765    if not (host_1 and host_2):
766        return host_1 == host_2
767    return extract_host(host_1) == extract_host(host_2)
768
769
770def read_proc_mounts():
771    """Read the /proc/mounts file.
772
773    It's a dummy function but it eases the writing of unit tests as mocking
774    __builtin__open() for a specific file only is not trivial.
775    """
776    with open('/proc/mounts') as mounts:
777        return mounts.readlines()
778
779
780def extract_id_from_volume_name(vol_name):
781    regex = re.compile(
782        CONF.volume_name_template.replace('%s', '(?P<uuid>.+)'))
783    match = regex.match(vol_name)
784    return match.group('uuid') if match else None
785
786
787def check_already_managed_volume(vol_id):
788    """Check cinder db for already managed volume.
789
790    :param vol_id: volume id parameter
791    :returns: bool -- return True, if db entry with specified
792                      volume id exists, otherwise return False
793    """
794    try:
795        return (vol_id and isinstance(vol_id, six.string_types) and
796                uuid.UUID(vol_id, version=4) and
797                objects.Volume.exists(context.get_admin_context(), vol_id))
798    except ValueError:
799        return False
800
801
802def extract_id_from_snapshot_name(snap_name):
803    """Return a snapshot's ID from its name on the backend."""
804    regex = re.compile(
805        CONF.snapshot_name_template.replace('%s', '(?P<uuid>.+)'))
806    match = regex.match(snap_name)
807    return match.group('uuid') if match else None
808
809
810def paginate_entries_list(entries, marker, limit, offset, sort_keys,
811                          sort_dirs):
812    """Paginate a list of entries.
813
814    :param entries: list of dictionaries
815    :marker: The last element previously returned
816    :limit: The maximum number of items to return
817    :offset: The number of items to skip from the marker or from the first
818             element.
819    :sort_keys: A list of keys in the dictionaries to sort by
820    :sort_dirs: A list of sort directions, where each is either 'asc' or 'dec'
821    """
822    comparers = [(operator.itemgetter(key.strip()), multiplier)
823                 for (key, multiplier) in zip(sort_keys, sort_dirs)]
824
825    def comparer(left, right):
826        for fn, d in comparers:
827            left_val = fn(left)
828            right_val = fn(right)
829            if isinstance(left_val, dict):
830                left_val = sorted(left_val.values())[0]
831            if isinstance(right_val, dict):
832                right_val = sorted(right_val.values())[0]
833            if left_val == right_val:
834                continue
835            if d == 'asc':
836                return -1 if left_val < right_val else 1
837            else:
838                return -1 if left_val > right_val else 1
839        else:
840            return 0
841    sorted_entries = sorted(entries, key=functools.cmp_to_key(comparer))
842
843    start_index = 0
844    if offset is None:
845        offset = 0
846    if marker:
847        if not isinstance(marker, dict):
848            try:
849                marker = json.loads(marker)
850            except ValueError:
851                msg = _('marker %s can not be analysed, please use json like '
852                        'format') % marker
853                raise exception.InvalidInput(reason=msg)
854        start_index = -1
855        for i, entry in enumerate(sorted_entries):
856            if entry['reference'] == marker:
857                start_index = i + 1
858                break
859        if start_index < 0:
860            msg = _('marker not found: %s') % marker
861            raise exception.InvalidInput(reason=msg)
862    range_end = start_index + limit
863    return sorted_entries[start_index + offset:range_end + offset]
864
865
866def convert_config_string_to_dict(config_string):
867    """Convert config file replication string to a dict.
868
869    The only supported form is as follows:
870    "{'key-1'='val-1' 'key-2'='val-2'...}"
871
872    :param config_string: Properly formatted string to convert to dict.
873    :response: dict of string values
874    """
875
876    resultant_dict = {}
877
878    try:
879        st = config_string.replace("=", ":")
880        st = st.replace(" ", ", ")
881        resultant_dict = ast.literal_eval(st)
882    except Exception:
883        LOG.warning("Error encountered translating config_string: "
884                    "%(config_string)s to dict",
885                    {'config_string': config_string})
886
887    return resultant_dict
888
889
890def create_encryption_key(context, key_manager, volume_type_id):
891    encryption_key_id = None
892    if volume_types.is_encrypted(context, volume_type_id):
893        volume_type_encryption = (
894            volume_types.get_volume_type_encryption(context,
895                                                    volume_type_id))
896        cipher = volume_type_encryption.cipher
897        length = volume_type_encryption.key_size
898        algorithm = cipher.split('-')[0] if cipher else None
899        try:
900            encryption_key_id = key_manager.create_key(
901                context,
902                algorithm=algorithm,
903                length=length)
904        except castellan_exception.KeyManagerError:
905            # The messaging back to the client here is
906            # purposefully terse, so we don't leak any sensitive
907            # details.
908            LOG.exception("Key manager error")
909            raise exception.Invalid(message="Key manager error")
910
911    return encryption_key_id
912
913
914def delete_encryption_key(context, key_manager, encryption_key_id):
915    try:
916        key_manager.delete(context, encryption_key_id)
917    except castellan_exception.ManagedObjectNotFoundError:
918        pass
919    except castellan_exception.KeyManagerError:
920        LOG.info("First attempt to delete key id %s failed, retrying with "
921                 "cinder's service context.", encryption_key_id)
922        conf = CONF
923        ks_loading.register_auth_conf_options(conf, 'keystone_authtoken')
924        service_context = keystone_password.KeystonePassword(
925            password=conf.keystone_authtoken.password,
926            auth_url=conf.keystone_authtoken.auth_url,
927            username=conf.keystone_authtoken.username,
928            user_domain_name=conf.keystone_authtoken.user_domain_name,
929            project_name=conf.keystone_authtoken.project_name,
930            project_domain_name=conf.keystone_authtoken.project_domain_name)
931        try:
932            castellan_key_manager.API(conf).delete(service_context,
933                                                   encryption_key_id)
934        except castellan_exception.ManagedObjectNotFoundError:
935            pass
936
937
938def clone_encryption_key(context, key_manager, encryption_key_id):
939    clone_key_id = None
940    if encryption_key_id is not None:
941        clone_key_id = key_manager.store(
942            context,
943            key_manager.get(context, encryption_key_id))
944    return clone_key_id
945
946
947def is_replicated_str(str):
948    spec = (str or '').split()
949    return (len(spec) == 2 and
950            spec[0] == '<is>' and strutils.bool_from_string(spec[1]))
951
952
953def is_replicated_spec(extra_specs):
954    return (extra_specs and
955            is_replicated_str(extra_specs.get('replication_enabled')))
956
957
958def group_get_by_id(group_id):
959    ctxt = context.get_admin_context()
960    group = db.group_get(ctxt, group_id)
961    return group
962
963
964def is_group_a_cg_snapshot_type(group_or_snap):
965    LOG.debug("Checking if %s is a consistent snapshot group",
966              group_or_snap)
967    if group_or_snap["group_type_id"] is not None:
968        spec = group_types.get_group_type_specs(
969            group_or_snap["group_type_id"],
970            key="consistent_group_snapshot_enabled"
971        )
972        return spec == "<is> True"
973    return False
974
975
976def is_group_a_type(group, key):
977    if group.group_type_id is not None:
978        spec = group_types.get_group_type_specs(
979            group.group_type_id, key=key
980        )
981        return spec == "<is> True"
982    return False
983
984
985def get_max_over_subscription_ratio(str_value, supports_auto=False):
986    """Get the max_over_subscription_ratio from a string
987
988    As some drivers need to do some calculations with the value and we are now
989    receiving a string value in the conf, this converts the value to float
990    when appropriate.
991
992    :param str_value: Configuration object
993    :param supports_auto: Tell if the calling driver supports auto MOSR.
994    :param drv_msg: Error message from the caller
995    :response: value of mosr
996    """
997
998    if not supports_auto and str_value == "auto":
999        msg = _("This driver does not support automatic "
1000                "max_over_subscription_ratio calculation. Please use a "
1001                "valid float value.")
1002        LOG.error(msg)
1003        raise exception.VolumeDriverException(message=msg)
1004
1005    if str_value == 'auto':
1006        return str_value
1007
1008    mosr = float(str_value)
1009    if mosr < 1:
1010        msg = _("The value of max_over_subscription_ratio must be "
1011                "greater than 1.")
1012        LOG.error(msg)
1013        raise exception.InvalidParameterValue(message=msg)
1014    return mosr
1015