1# Copyright (c) 2015 Huawei Technologies Co., Ltd. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14 15from oslo_config import cfg 16from oslo_log import log as logging 17import taskflow.engines 18from taskflow.patterns import linear_flow 19from taskflow.types import failure as ft 20 21from cinder import exception 22from cinder import flow_utils 23from cinder.i18n import _ 24from cinder import objects 25from cinder.objects import fields 26from cinder import quota 27from cinder import quota_utils 28from cinder.volume.flows import common as flow_common 29from cinder.volume import utils as volume_utils 30 31 32CONF = cfg.CONF 33LOG = logging.getLogger(__name__) 34QUOTAS = quota.QUOTAS 35 36ACTION = 'snapshot:manage_existing' 37 38 39class ExtractSnapshotRefTask(flow_utils.CinderTask): 40 """Extracts snapshot reference for given snapshot id.""" 41 42 default_provides = 'snapshot_ref' 43 44 def __init__(self, db): 45 super(ExtractSnapshotRefTask, self).__init__(addons=[ACTION]) 46 self.db = db 47 48 def execute(self, context, snapshot_id): 49 # NOTE(wanghao): this will fetch the snapshot from the database, if 50 # the snapshot has been deleted before we got here then this should 51 # fail. 52 # 53 # In the future we might want to have a lock on the snapshot_id so that 54 # the snapshot can not be deleted while its still being created? 55 snapshot_ref = objects.Snapshot.get_by_id(context, snapshot_id) 56 LOG.debug("ExtractSnapshotRefTask return" 57 " snapshot_ref: %s", snapshot_ref) 58 return snapshot_ref 59 60 def revert(self, context, snapshot_id, result, **kwargs): 61 if isinstance(result, ft.Failure): 62 return 63 64 flow_common.error_out(result) 65 LOG.error("Snapshot %s: create failed", result.id) 66 67 68class NotifySnapshotActionTask(flow_utils.CinderTask): 69 """Performs a notification about the given snapshot when called. 70 71 Reversion strategy: N/A 72 """ 73 74 def __init__(self, db, event_suffix, host): 75 super(NotifySnapshotActionTask, self).__init__(addons=[ACTION, 76 event_suffix]) 77 self.db = db 78 self.event_suffix = event_suffix 79 self.host = host 80 81 def execute(self, context, snapshot_ref): 82 snapshot_id = snapshot_ref['id'] 83 try: 84 volume_utils.notify_about_snapshot_usage(context, snapshot_ref, 85 self.event_suffix, 86 host=self.host) 87 except exception.CinderException: 88 # If notification sending of snapshot database entry reading fails 89 # then we shouldn't error out the whole workflow since this is 90 # not always information that must be sent for snapshots to operate 91 LOG.exception("Failed notifying about the snapshot " 92 "action %(event)s for snapshot %(snp_id)s.", 93 {'event': self.event_suffix, 94 'snp_id': snapshot_id}) 95 96 97class PrepareForQuotaReservationTask(flow_utils.CinderTask): 98 """Gets the snapshot size from the driver.""" 99 100 default_provides = set(['size', 'snapshot_properties']) 101 102 def __init__(self, db, driver): 103 super(PrepareForQuotaReservationTask, self).__init__(addons=[ACTION]) 104 self.db = db 105 self.driver = driver 106 107 def execute(self, context, snapshot_ref, manage_existing_ref): 108 if not self.driver.initialized: 109 driver_name = (self.driver.configuration. 110 safe_get('volume_backend_name')) 111 LOG.error("Unable to manage existing snapshot. " 112 "Volume driver %s not initialized.", driver_name) 113 flow_common.error_out(snapshot_ref, reason=_("Volume driver %s " 114 "not initialized.") % 115 driver_name) 116 raise exception.DriverNotInitialized() 117 118 size = self.driver.manage_existing_snapshot_get_size( 119 snapshot=snapshot_ref, 120 existing_ref=manage_existing_ref) 121 122 return {'size': size, 123 'snapshot_properties': snapshot_ref} 124 125 126class QuotaReserveTask(flow_utils.CinderTask): 127 """Reserves a single snapshot with the given size. 128 129 Reversion strategy: rollback the quota reservation. 130 131 Warning Warning: if the process that is running this reserve and commit 132 process fails (or is killed before the quota is rolled back or committed 133 it does appear like the quota will never be rolled back). This makes 134 software upgrades hard (inflight operations will need to be stopped or 135 allowed to complete before the upgrade can occur). *In the future* when 136 taskflow has persistence built-in this should be easier to correct via 137 an automated or manual process. 138 """ 139 140 default_provides = set(['reservations']) 141 142 def __init__(self): 143 super(QuotaReserveTask, self).__init__(addons=[ACTION]) 144 145 def execute(self, context, size, snapshot_ref, optional_args): 146 try: 147 if CONF.no_snapshot_gb_quota: 148 reserve_opts = {'snapshots': 1} 149 else: 150 # NOTE(tommylikehu): We only use the difference of size here 151 # as we already committed the original size at the API 152 # service before and this reservation task is only used for 153 # managing snapshots now. 154 reserve_opts = {'snapshots': 1, 155 'gigabytes': 156 int(size) - snapshot_ref.volume_size} 157 if 'update_size' in optional_args and optional_args['update_size']: 158 reserve_opts.pop('snapshots', None) 159 volume = objects.Volume.get_by_id(context, snapshot_ref.volume_id) 160 QUOTAS.add_volume_type_opts(context, 161 reserve_opts, 162 volume.volume_type_id) 163 reservations = QUOTAS.reserve(context, **reserve_opts) 164 return { 165 'reservations': reservations, 166 } 167 except exception.OverQuota as e: 168 quota_utils.process_reserve_over_quota( 169 context, e, 170 resource='snapshots', 171 size=size) 172 173 def revert(self, context, result, optional_args, **kwargs): 174 # We never produced a result and therefore can't destroy anything. 175 if isinstance(result, ft.Failure): 176 return 177 178 if optional_args['is_quota_committed']: 179 # The reservations have already been committed and can not be 180 # rolled back at this point. 181 return 182 # We actually produced an output that we can revert so lets attempt 183 # to use said output to rollback the reservation. 184 reservations = result['reservations'] 185 try: 186 QUOTAS.rollback(context, reservations) 187 except exception.CinderException: 188 # We are already reverting, therefore we should silence this 189 # exception since a second exception being active will be bad. 190 LOG.exception("Failed rolling back quota for" 191 " %s reservations.", reservations) 192 193 194class QuotaCommitTask(flow_utils.CinderTask): 195 """Commits the reservation. 196 197 Reversion strategy: N/A (the rollback will be handled by the task that did 198 the initial reservation (see: QuotaReserveTask). 199 200 Warning Warning: if the process that is running this reserve and commit 201 process fails (or is killed before the quota is rolled back or committed 202 it does appear like the quota will never be rolled back). This makes 203 software upgrades hard (inflight operations will need to be stopped or 204 allowed to complete before the upgrade can occur). *In the future* when 205 taskflow has persistence built-in this should be easier to correct via 206 an automated or manual process. 207 """ 208 209 def __init__(self): 210 super(QuotaCommitTask, self).__init__(addons=[ACTION]) 211 212 def execute(self, context, reservations, snapshot_properties, 213 optional_args): 214 QUOTAS.commit(context, reservations) 215 # updating is_quota_committed attribute of optional_args dictionary 216 optional_args['is_quota_committed'] = True 217 return {'snapshot_properties': snapshot_properties} 218 219 def revert(self, context, result, **kwargs): 220 # We never produced a result and therefore can't destroy anything. 221 if isinstance(result, ft.Failure): 222 return 223 snapshot = result['snapshot_properties'] 224 try: 225 reserve_opts = {'snapshots': -1, 226 'gigabytes': -snapshot['volume_size']} 227 reservations = QUOTAS.reserve(context, 228 project_id=context.project_id, 229 **reserve_opts) 230 if reservations: 231 QUOTAS.commit(context, reservations, 232 project_id=context.project_id) 233 except Exception: 234 LOG.exception("Failed to update quota while deleting " 235 "snapshots: %s", snapshot['id']) 236 237 238class ManageExistingTask(flow_utils.CinderTask): 239 """Brings an existing snapshot under Cinder management.""" 240 241 default_provides = set(['snapshot', 'new_status']) 242 243 def __init__(self, db, driver): 244 super(ManageExistingTask, self).__init__(addons=[ACTION]) 245 self.db = db 246 self.driver = driver 247 248 def execute(self, context, snapshot_ref, manage_existing_ref, size): 249 model_update = self.driver.manage_existing_snapshot( 250 snapshot=snapshot_ref, 251 existing_ref=manage_existing_ref) 252 if not model_update: 253 model_update = {} 254 model_update['volume_size'] = size 255 try: 256 snapshot_object = objects.Snapshot.get_by_id(context, 257 snapshot_ref['id']) 258 snapshot_object.update(model_update) 259 snapshot_object.save() 260 except exception.CinderException: 261 LOG.exception("Failed updating model of snapshot " 262 "%(snapshot_id)s with creation provided model " 263 "%(model)s.", 264 {'snapshot_id': snapshot_ref['id'], 265 'model': model_update}) 266 raise 267 268 return {'snapshot': snapshot_ref, 269 'new_status': fields.SnapshotStatus.AVAILABLE} 270 271 272class CreateSnapshotOnFinishTask(NotifySnapshotActionTask): 273 """Perform final snapshot actions. 274 275 When a snapshot is created successfully it is expected that MQ 276 notifications and database updates will occur to 'signal' to others that 277 the snapshot is now ready for usage. This task does those notifications and 278 updates in a reliable manner (not re-raising exceptions if said actions can 279 not be triggered). 280 281 Reversion strategy: N/A 282 """ 283 284 def execute(self, context, snapshot, new_status): 285 LOG.debug("Begin to call CreateSnapshotOnFinishTask execute.") 286 snapshot_id = snapshot['id'] 287 LOG.debug("New status: %s", new_status) 288 update = { 289 'status': new_status 290 } 291 try: 292 # TODO(harlowja): is it acceptable to only log if this fails?? 293 # or are there other side-effects that this will cause if the 294 # status isn't updated correctly (aka it will likely be stuck in 295 # 'building' if this fails)?? 296 snapshot_object = objects.Snapshot.get_by_id(context, 297 snapshot_id) 298 snapshot_object.update(update) 299 snapshot_object.save() 300 # Now use the parent to notify. 301 super(CreateSnapshotOnFinishTask, self).execute(context, snapshot) 302 except exception.CinderException: 303 LOG.exception("Failed updating snapshot %(snapshot_id)s with " 304 "%(update)s.", {'snapshot_id': snapshot_id, 305 'update': update}) 306 # Even if the update fails, the snapshot is ready. 307 LOG.info("Snapshot %s created successfully.", snapshot_id) 308 309 310def get_flow(context, db, driver, host, snapshot_id, ref): 311 """Constructs and returns the manager entry point flow.""" 312 313 LOG.debug("Input parameters: context=%(context)s, db=%(db)s," 314 "driver=%(driver)s, host=%(host)s, " 315 "snapshot_id=(snapshot_id)s, ref=%(ref)s.", 316 {'context': context, 317 'db': db, 318 'driver': driver, 319 'host': host, 320 'snapshot_id': snapshot_id, 321 'ref': ref} 322 ) 323 flow_name = ACTION.replace(":", "_") + "_manager" 324 snapshot_flow = linear_flow.Flow(flow_name) 325 326 # This injects the initial starting flow values into the workflow so that 327 # the dependency order of the tasks provides/requires can be correctly 328 # determined. 329 create_what = { 330 'context': context, 331 'snapshot_id': snapshot_id, 332 'manage_existing_ref': ref, 333 'optional_args': {'is_quota_committed': False, 'update_size': True} 334 } 335 336 notify_start_msg = "manage_existing_snapshot.start" 337 notify_end_msg = "manage_existing_snapshot.end" 338 snapshot_flow.add(ExtractSnapshotRefTask(db), 339 NotifySnapshotActionTask(db, notify_start_msg, 340 host=host), 341 PrepareForQuotaReservationTask(db, driver), 342 QuotaReserveTask(), 343 ManageExistingTask(db, driver), 344 QuotaCommitTask(), 345 CreateSnapshotOnFinishTask(db, notify_end_msg, 346 host=host)) 347 LOG.debug("Begin to return taskflow.engines." 348 "load(snapshot_flow,store=create_what).") 349 # Now load (but do not run) the flow using the provided initial data. 350 return taskflow.engines.load(snapshot_flow, store=create_what) 351