1#   Copyright (c) 2015 Huawei Technologies Co., Ltd.
2#
3#   Licensed under the Apache License, Version 2.0 (the "License"); you may
4#   not use this file except in compliance with the License. You may obtain
5#   a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#   Unless required by applicable law or agreed to in writing, software
10#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12#   License for the specific language governing permissions and limitations
13#   under the License.
14
15from oslo_config import cfg
16from oslo_log import log as logging
17import taskflow.engines
18from taskflow.patterns import linear_flow
19from taskflow.types import failure as ft
20
21from cinder import exception
22from cinder import flow_utils
23from cinder.i18n import _
24from cinder import objects
25from cinder.objects import fields
26from cinder import quota
27from cinder import quota_utils
28from cinder.volume.flows import common as flow_common
29from cinder.volume import utils as volume_utils
30
31
32CONF = cfg.CONF
33LOG = logging.getLogger(__name__)
34QUOTAS = quota.QUOTAS
35
36ACTION = 'snapshot:manage_existing'
37
38
39class ExtractSnapshotRefTask(flow_utils.CinderTask):
40    """Extracts snapshot reference for given snapshot id."""
41
42    default_provides = 'snapshot_ref'
43
44    def __init__(self, db):
45        super(ExtractSnapshotRefTask, self).__init__(addons=[ACTION])
46        self.db = db
47
48    def execute(self, context, snapshot_id):
49        # NOTE(wanghao): this will fetch the snapshot from the database, if
50        # the snapshot has been deleted before we got here then this should
51        # fail.
52        #
53        # In the future we might want to have a lock on the snapshot_id so that
54        # the snapshot can not be deleted while its still being created?
55        snapshot_ref = objects.Snapshot.get_by_id(context, snapshot_id)
56        LOG.debug("ExtractSnapshotRefTask return"
57                  " snapshot_ref: %s", snapshot_ref)
58        return snapshot_ref
59
60    def revert(self, context, snapshot_id, result, **kwargs):
61        if isinstance(result, ft.Failure):
62            return
63
64        flow_common.error_out(result)
65        LOG.error("Snapshot %s: create failed", result.id)
66
67
68class NotifySnapshotActionTask(flow_utils.CinderTask):
69    """Performs a notification about the given snapshot when called.
70
71    Reversion strategy: N/A
72    """
73
74    def __init__(self, db, event_suffix, host):
75        super(NotifySnapshotActionTask, self).__init__(addons=[ACTION,
76                                                               event_suffix])
77        self.db = db
78        self.event_suffix = event_suffix
79        self.host = host
80
81    def execute(self, context, snapshot_ref):
82        snapshot_id = snapshot_ref['id']
83        try:
84            volume_utils.notify_about_snapshot_usage(context, snapshot_ref,
85                                                     self.event_suffix,
86                                                     host=self.host)
87        except exception.CinderException:
88            # If notification sending of snapshot database entry reading fails
89            # then we shouldn't error out the whole workflow since this is
90            # not always information that must be sent for snapshots to operate
91            LOG.exception("Failed notifying about the snapshot "
92                          "action %(event)s for snapshot %(snp_id)s.",
93                          {'event': self.event_suffix,
94                           'snp_id': snapshot_id})
95
96
97class PrepareForQuotaReservationTask(flow_utils.CinderTask):
98    """Gets the snapshot size from the driver."""
99
100    default_provides = set(['size', 'snapshot_properties'])
101
102    def __init__(self, db, driver):
103        super(PrepareForQuotaReservationTask, self).__init__(addons=[ACTION])
104        self.db = db
105        self.driver = driver
106
107    def execute(self, context, snapshot_ref, manage_existing_ref):
108        if not self.driver.initialized:
109            driver_name = (self.driver.configuration.
110                           safe_get('volume_backend_name'))
111            LOG.error("Unable to manage existing snapshot. "
112                      "Volume driver %s not initialized.", driver_name)
113            flow_common.error_out(snapshot_ref, reason=_("Volume driver %s "
114                                                         "not initialized.") %
115                                  driver_name)
116            raise exception.DriverNotInitialized()
117
118        size = self.driver.manage_existing_snapshot_get_size(
119            snapshot=snapshot_ref,
120            existing_ref=manage_existing_ref)
121
122        return {'size': size,
123                'snapshot_properties': snapshot_ref}
124
125
126class QuotaReserveTask(flow_utils.CinderTask):
127    """Reserves a single snapshot with the given size.
128
129    Reversion strategy: rollback the quota reservation.
130
131    Warning Warning: if the process that is running this reserve and commit
132    process fails (or is killed before the quota is rolled back or committed
133    it does appear like the quota will never be rolled back). This makes
134    software upgrades hard (inflight operations will need to be stopped or
135    allowed to complete before the upgrade can occur). *In the future* when
136    taskflow has persistence built-in this should be easier to correct via
137    an automated or manual process.
138    """
139
140    default_provides = set(['reservations'])
141
142    def __init__(self):
143        super(QuotaReserveTask, self).__init__(addons=[ACTION])
144
145    def execute(self, context, size, snapshot_ref, optional_args):
146        try:
147            if CONF.no_snapshot_gb_quota:
148                reserve_opts = {'snapshots': 1}
149            else:
150                # NOTE(tommylikehu): We only use the difference of size here
151                # as we already committed the original size at the API
152                # service before and this reservation task is only used for
153                # managing snapshots now.
154                reserve_opts = {'snapshots': 1,
155                                'gigabytes':
156                                    int(size) - snapshot_ref.volume_size}
157            if 'update_size' in optional_args and optional_args['update_size']:
158                reserve_opts.pop('snapshots', None)
159            volume = objects.Volume.get_by_id(context, snapshot_ref.volume_id)
160            QUOTAS.add_volume_type_opts(context,
161                                        reserve_opts,
162                                        volume.volume_type_id)
163            reservations = QUOTAS.reserve(context, **reserve_opts)
164            return {
165                'reservations': reservations,
166            }
167        except exception.OverQuota as e:
168            quota_utils.process_reserve_over_quota(
169                context, e,
170                resource='snapshots',
171                size=size)
172
173    def revert(self, context, result, optional_args, **kwargs):
174        # We never produced a result and therefore can't destroy anything.
175        if isinstance(result, ft.Failure):
176            return
177
178        if optional_args['is_quota_committed']:
179            # The reservations have already been committed and can not be
180            # rolled back at this point.
181            return
182        # We actually produced an output that we can revert so lets attempt
183        # to use said output to rollback the reservation.
184        reservations = result['reservations']
185        try:
186            QUOTAS.rollback(context, reservations)
187        except exception.CinderException:
188            # We are already reverting, therefore we should silence this
189            # exception since a second exception being active will be bad.
190            LOG.exception("Failed rolling back quota for"
191                          " %s reservations.", reservations)
192
193
194class QuotaCommitTask(flow_utils.CinderTask):
195    """Commits the reservation.
196
197    Reversion strategy: N/A (the rollback will be handled by the task that did
198    the initial reservation (see: QuotaReserveTask).
199
200    Warning Warning: if the process that is running this reserve and commit
201    process fails (or is killed before the quota is rolled back or committed
202    it does appear like the quota will never be rolled back). This makes
203    software upgrades hard (inflight operations will need to be stopped or
204    allowed to complete before the upgrade can occur). *In the future* when
205    taskflow has persistence built-in this should be easier to correct via
206    an automated or manual process.
207    """
208
209    def __init__(self):
210        super(QuotaCommitTask, self).__init__(addons=[ACTION])
211
212    def execute(self, context, reservations, snapshot_properties,
213                optional_args):
214        QUOTAS.commit(context, reservations)
215        # updating is_quota_committed attribute of optional_args dictionary
216        optional_args['is_quota_committed'] = True
217        return {'snapshot_properties': snapshot_properties}
218
219    def revert(self, context, result, **kwargs):
220        # We never produced a result and therefore can't destroy anything.
221        if isinstance(result, ft.Failure):
222            return
223        snapshot = result['snapshot_properties']
224        try:
225            reserve_opts = {'snapshots': -1,
226                            'gigabytes': -snapshot['volume_size']}
227            reservations = QUOTAS.reserve(context,
228                                          project_id=context.project_id,
229                                          **reserve_opts)
230            if reservations:
231                QUOTAS.commit(context, reservations,
232                              project_id=context.project_id)
233        except Exception:
234            LOG.exception("Failed to update quota while deleting "
235                          "snapshots: %s", snapshot['id'])
236
237
238class ManageExistingTask(flow_utils.CinderTask):
239    """Brings an existing snapshot under Cinder management."""
240
241    default_provides = set(['snapshot', 'new_status'])
242
243    def __init__(self, db, driver):
244        super(ManageExistingTask, self).__init__(addons=[ACTION])
245        self.db = db
246        self.driver = driver
247
248    def execute(self, context, snapshot_ref, manage_existing_ref, size):
249        model_update = self.driver.manage_existing_snapshot(
250            snapshot=snapshot_ref,
251            existing_ref=manage_existing_ref)
252        if not model_update:
253            model_update = {}
254        model_update['volume_size'] = size
255        try:
256            snapshot_object = objects.Snapshot.get_by_id(context,
257                                                         snapshot_ref['id'])
258            snapshot_object.update(model_update)
259            snapshot_object.save()
260        except exception.CinderException:
261            LOG.exception("Failed updating model of snapshot "
262                          "%(snapshot_id)s with creation provided model "
263                          "%(model)s.",
264                          {'snapshot_id': snapshot_ref['id'],
265                           'model': model_update})
266            raise
267
268        return {'snapshot': snapshot_ref,
269                'new_status': fields.SnapshotStatus.AVAILABLE}
270
271
272class CreateSnapshotOnFinishTask(NotifySnapshotActionTask):
273    """Perform final snapshot actions.
274
275    When a snapshot is created successfully it is expected that MQ
276    notifications and database updates will occur to 'signal' to others that
277    the snapshot is now ready for usage. This task does those notifications and
278    updates in a reliable manner (not re-raising exceptions if said actions can
279    not be triggered).
280
281    Reversion strategy: N/A
282    """
283
284    def execute(self, context, snapshot, new_status):
285        LOG.debug("Begin to call CreateSnapshotOnFinishTask execute.")
286        snapshot_id = snapshot['id']
287        LOG.debug("New status: %s", new_status)
288        update = {
289            'status': new_status
290        }
291        try:
292            # TODO(harlowja): is it acceptable to only log if this fails??
293            # or are there other side-effects that this will cause if the
294            # status isn't updated correctly (aka it will likely be stuck in
295            # 'building' if this fails)??
296            snapshot_object = objects.Snapshot.get_by_id(context,
297                                                         snapshot_id)
298            snapshot_object.update(update)
299            snapshot_object.save()
300            # Now use the parent to notify.
301            super(CreateSnapshotOnFinishTask, self).execute(context, snapshot)
302        except exception.CinderException:
303            LOG.exception("Failed updating snapshot %(snapshot_id)s with "
304                          "%(update)s.", {'snapshot_id': snapshot_id,
305                                          'update': update})
306        # Even if the update fails, the snapshot is ready.
307        LOG.info("Snapshot %s created successfully.", snapshot_id)
308
309
310def get_flow(context, db, driver, host, snapshot_id, ref):
311    """Constructs and returns the manager entry point flow."""
312
313    LOG.debug("Input parameters: context=%(context)s, db=%(db)s,"
314              "driver=%(driver)s, host=%(host)s, "
315              "snapshot_id=(snapshot_id)s, ref=%(ref)s.",
316              {'context': context,
317               'db': db,
318               'driver': driver,
319               'host': host,
320               'snapshot_id': snapshot_id,
321               'ref': ref}
322              )
323    flow_name = ACTION.replace(":", "_") + "_manager"
324    snapshot_flow = linear_flow.Flow(flow_name)
325
326    # This injects the initial starting flow values into the workflow so that
327    # the dependency order of the tasks provides/requires can be correctly
328    # determined.
329    create_what = {
330        'context': context,
331        'snapshot_id': snapshot_id,
332        'manage_existing_ref': ref,
333        'optional_args': {'is_quota_committed': False, 'update_size': True}
334    }
335
336    notify_start_msg = "manage_existing_snapshot.start"
337    notify_end_msg = "manage_existing_snapshot.end"
338    snapshot_flow.add(ExtractSnapshotRefTask(db),
339                      NotifySnapshotActionTask(db, notify_start_msg,
340                                               host=host),
341                      PrepareForQuotaReservationTask(db, driver),
342                      QuotaReserveTask(),
343                      ManageExistingTask(db, driver),
344                      QuotaCommitTask(),
345                      CreateSnapshotOnFinishTask(db, notify_end_msg,
346                                                 host=host))
347    LOG.debug("Begin to return taskflow.engines."
348              "load(snapshot_flow,store=create_what).")
349    # Now load (but do not run) the flow using the provided initial data.
350    return taskflow.engines.load(snapshot_flow, store=create_what)
351