1# -*- coding: utf-8 -*- #
2# Copyright 2019 Google LLC. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#    http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Utilities for waiting on Compute Engine operations."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import unicode_literals
20
21from apitools.base.py import exceptions as apitools_exceptions
22
23from googlecloudsdk.api_lib.compute import batch_helper
24from googlecloudsdk.api_lib.compute import path_simplifier
25from googlecloudsdk.api_lib.util import exceptions as http_exceptions
26from googlecloudsdk.command_lib.util import time_util
27from googlecloudsdk.core import log
28
29_POLLING_TIMEOUT_SEC = 60 * 30
30_MAX_TIME_BETWEEN_POLLS_SEC = 5
31
32# The set of possible operation types is {insert, delete, update,
33# *.insert, *.delete, *.update} + all verbs. For example,
34#
35#        verb                    op type
36#   Instances.setMedata        setMetdata
37#   Instances.insert           insert
38#   InsetanceTempalte.delete   compute.instanceTemplates.delete
39# In our status reporting, we use the following
40# mapping. Anything not in the map is reported as "Updated".
41_HUMAN_FRIENDLY_OPERATION_TYPE_SUFFIXES = {
42    'createSnapshot': {
43        'past': 'created',
44        'present': 'create'
45    },
46    'recreateInstancesInstanceGroupManager': {
47        'past': 'recreated',
48        'present': 'recreate'
49    },
50    'createFirewallSecurityPolicy': {
51        'past': 'created',
52        'present': 'create'
53    },
54    'deleteFirewallSecurityPolicy': {
55        'past': 'deleted',
56        'present': 'delete'
57    },
58    'createPSCServiceEndpoint': {
59        'past': 'created',
60        'present': 'create'
61    },
62    'deletePscForwardingRule': {
63        'past': 'deleted',
64        'present': 'delete'
65    },
66    'createRegionPscForwardingRule': {
67        'past': 'created',
68        'present': 'create'
69    },
70    'deleteRegionPscForwardingRule': {
71        'past': 'deleted',
72        'present': 'delete'
73    },
74    'insert': {
75        'past': 'created',
76        'present': 'create'
77    },
78    'delete': {
79        'past': 'deleted',
80        'present': 'delete'
81    },
82    'update': {
83        'past': 'updated',
84        'present': 'update'
85    },
86    'invalidateCache': {
87        'past': 'completed invalidation for',
88        'present': 'complete invalidation for'
89    }
90}
91
92
93def _HumanFriendlyNamesForOp(op_type):
94  for s in _HUMAN_FRIENDLY_OPERATION_TYPE_SUFFIXES:
95    if op_type.endswith(s):
96      return _HUMAN_FRIENDLY_OPERATION_TYPE_SUFFIXES.get(s)
97
98  return {'past': 'updated', 'present': 'update'}
99
100
101def _HumanFriendlyNameForOpPastTense(op_type):
102  return _HumanFriendlyNamesForOp(op_type)['past']
103
104
105def _HumanFriendlyNameForOpPresentTense(op_type):
106  return _HumanFriendlyNamesForOp(op_type)['present']
107
108
109def _IsDeleteOp(op_type):
110  return _HumanFriendlyNameForOpPastTense(op_type) == 'deleted'
111
112
113def _RecordProblems(operation, warnings, errors):
114  """Records any warnings and errors into the given lists."""
115  for warning in operation.warnings or []:
116    warnings.append(warning.message)
117  if operation.error:
118    for error in operation.error.errors or []:
119      errors.append((operation.httpErrorStatusCode, error.message))
120
121
122def _RecordUnfinishedOperations(operations, errors):
123  """Adds error messages stating that the given operations timed out."""
124  pending_resources = [operation.targetLink for operation in operations]
125  errors.append(
126      (None, ('Did not {action} the following resources within '
127              '{timeout}s: {links}. These operations may still be '
128              'underway remotely and may still succeed; use gcloud list '
129              'and describe commands or '
130              'https://console.developers.google.com/ to '
131              'check resource state').format(
132                  action=_HumanFriendlyNameForOpPresentTense(
133                      operations[0].operationType),
134                  timeout=_POLLING_TIMEOUT_SEC,
135                  links=', '.join(pending_resources))))
136
137
138class OperationData(object):
139  """Holds all information necessary to poll given operation.
140
141  Attributes:
142    operation: An Operation object to poll.
143    operation_service: The service that can be used to get operation
144      object.
145    resource_service: The service of the collection being mutated by
146      the operation. If the operation type is not delete, this service
147      is used to fetch the mutated object after the operation is done.
148    project: str, The project to which the resource belong.
149    no_followup: str, If True, do not send followup GET request.
150    followup_override: str, Overrides the target resource name when
151      it is different from the resource name which is used to poll.
152    always_return_operation: If true, always return operation object even if
153      the operation fails.
154    errors: An output parameter for capturing errors.
155    warnings: An output parameter for capturing warnings.
156  """
157
158  def __init__(self,
159               operation,
160               operation_service,
161               resource_service,
162               project=None,
163               no_followup=False,
164               followup_override=None,
165               always_return_operation=False):
166    self.operation = operation
167    self.operation_service = operation_service
168    self.resource_service = resource_service
169    self.project = project
170    self.no_followup = no_followup
171    self.followup_override = followup_override
172    self.always_return_operation = always_return_operation
173
174    self.errors = []
175    self.warnings = []
176
177  def __eq__(self, o):
178    if not isinstance(o, OperationData):
179      return False
180    return (self.operation == o.operation and self.project == o.project and
181            self.operation_service == o.operation_service and
182            self.resource_service == o.resource_service and
183            self.no_followup == o.no_followup and
184            self.followup_override == o.followup_override)
185
186  def __hash__(self):
187    return (hash(self.operation.selfLink) ^ hash(self.project)
188            ^ hash(self.operation_service) ^ hash(self.resource_service)
189            ^ hash(self.no_followup) ^ hash(self.followup_override))
190
191  def __ne__(self, o):
192    return not self == o
193
194  def SetOperation(self, operation):
195    """"Updates the operation.
196
197    Args:
198      operation: Operation to be assigned.
199    """
200    self.operation = operation
201
202  def IsGlobalOrganizationOperation(self):
203    if not hasattr(self.operation_service.client,
204                   'globalOrganizationOperations'):
205      return False
206    return (self.operation_service ==
207            self.operation_service.client.globalOrganizationOperations)
208
209  def IsDone(self):
210    """Returns true if the operation is done."""
211    operation_type = self.operation_service.GetResponseType('Get')
212    done = operation_type.StatusValueValuesEnum.DONE
213    return self.operation.status == done
214
215  def _SupportOperationWait(self):
216    return 'Wait' in self.operation_service.GetMethodsList()
217
218  def ResourceGetRequest(self):
219    """"Generates apitools request message to get the resource."""
220
221    target_link = self.operation.targetLink
222
223    if self.project:
224      request = self.resource_service.GetRequestType('Get')(
225          project=self.project)
226    else:
227      # Gets the flexible resource ID.
228      if target_link is None:
229        log.status.write('{0}.\n'.format(
230            _HumanFriendlyNameForOpPastTense(
231                self.operation.operationType).capitalize()))
232        return
233      token_list = target_link.split('/')
234      flexible_resource_id = token_list[-1]
235      request = self.resource_service.GetRequestType('Get')(
236          securityPolicy=flexible_resource_id)
237    if self.operation.zone:
238      request.zone = path_simplifier.Name(self.operation.zone)
239    elif self.operation.region:
240      request.region = path_simplifier.Name(self.operation.region)
241    name_field = self.resource_service.GetMethodConfig('Get').ordered_params[-1]
242
243    resource_name = self.followup_override or path_simplifier.Name(
244        self.operation.targetLink)
245
246    setattr(request, name_field, resource_name)
247    return request
248
249  def _OperationRequest(self, verb):
250    """Generates apitools request message to poll the operation."""
251
252    if self.project:
253      request = self.operation_service.GetRequestType(verb)(
254          operation=self.operation.name, project=self.project)
255    else:
256      # Fetches the parent ID from the operation name.
257      token_list = self.operation.name.split('-')
258      parent_id = 'organizations/' + token_list[1]
259      request = self.operation_service.GetRequestType(verb)(
260          operation=self.operation.name, parentId=parent_id)
261    if self.operation.zone:
262      request.zone = path_simplifier.Name(self.operation.zone)
263    elif self.operation.region:
264      request.region = path_simplifier.Name(self.operation.region)
265    return request
266
267  def OperationGetRequest(self):
268    """Generates apitools request message for operations.get method."""
269    return self._OperationRequest('Get')
270
271  def OperationWaitRequest(self):
272    """Generates apitools request message for operations.wait method."""
273    return self._OperationRequest('Wait')
274
275  def _CallService(self, method, request):
276    try:
277      return method(request)
278    except apitools_exceptions.HttpError as e:
279      http_err = http_exceptions.HttpException(e)
280      self.errors.append((http_err.error.status_code, http_err.message))
281      _RecordProblems(self.operation, self.warnings, self.errors)
282      raise
283
284  def _PollUntilDoneUsingOperationGet(self, timeout_sec=_POLLING_TIMEOUT_SEC):
285    """Polls the operation with operation Get method."""
286    get_request = self.OperationGetRequest()
287    start = time_util.CurrentTimeSec()
288    poll_time_interval = 0
289    max_poll_interval = 5  # 5 seconds
290
291    while True:
292      if time_util.CurrentTimeSec() - start > timeout_sec:
293        self.errors.append(
294            (None, 'operation {} timed out'.format(self.operation.name)))
295        _RecordProblems(self.operation, self.warnings, self.errors)
296        return
297
298      try:
299        self.operation = self._CallService(self.operation_service.Get,
300                                           get_request)
301      except apitools_exceptions.HttpError:
302        return
303
304      if self.IsDone():
305        _RecordProblems(self.operation, self.warnings, self.errors)
306        return
307      poll_time_interval = min(poll_time_interval + 1, max_poll_interval)
308      time_util.Sleep(poll_time_interval)
309
310  def _PollUntilDoneUsingOperationWait(self, timeout_sec=_POLLING_TIMEOUT_SEC):
311    """Polls the operation with operation method."""
312    wait_request = self.OperationWaitRequest()
313    start = time_util.CurrentTimeSec()
314
315    while not self.IsDone():
316      if time_util.CurrentTimeSec() - start > timeout_sec:
317        self.errors.append(
318            (None, 'operation {} timed out'.format(self.operation.name)))
319        _RecordProblems(self.operation, self.warnings, self.errors)
320        return
321      try:
322        self.operation = self._CallService(self.operation_service.Wait,
323                                           wait_request)
324      except apitools_exceptions.HttpError:
325        return
326
327    _RecordProblems(self.operation, self.warnings, self.errors)
328
329  def PollUntilDone(self, timeout_sec=_POLLING_TIMEOUT_SEC):
330    """Polls the operation until it is done."""
331    if self.IsDone():
332      return
333
334    if self._SupportOperationWait():
335      self._PollUntilDoneUsingOperationWait(timeout_sec)
336    else:
337      self._PollUntilDoneUsingOperationGet(timeout_sec)
338
339  def GetResult(self, timeout_sec=_POLLING_TIMEOUT_SEC):
340    """Get the resource which is touched by the operation."""
341    self.PollUntilDone(timeout_sec)
342    if not self.no_followup and not self.operation.error and not _IsDeleteOp(
343        self.operation.operationType):
344      resource_get_request = self.ResourceGetRequest()
345      try:
346        return self._CallService(self.resource_service.Get,
347                                 resource_get_request)
348      except apitools_exceptions.HttpError:
349        pass
350
351
352def WaitForOperations(
353    operations_data, http, batch_url, warnings, errors,
354    progress_tracker=None, timeout=None, log_result=True):
355  """Blocks until the given operations are done or until a timeout is reached.
356
357  Args:
358    operations_data: A list of OperationData objects holding Operations to poll.
359    http: An HTTP object.
360    batch_url: The URL to which batch requests should be sent.
361    warnings: An output parameter for capturing warnings.
362    errors: An output parameter for capturing errors.
363    progress_tracker: progress tracker to tick while waiting for operations to
364                      finish.
365    timeout: The maximum amount of time, in seconds, to wait for the
366      operations to reach the DONE state.
367    log_result: Whether the Operation Waiter should print the result in past
368      tense of each request.
369
370  Yields:
371    The resources pointed to by the operations' targetLink fields if
372    the operation type is not delete. Only resources whose
373    corresponding operations reach done are yielded.
374  """
375  if not operations_data:
376    return
377  timeout = timeout or _POLLING_TIMEOUT_SEC
378
379  # Operation -> OperationData mapping will be used to reify operation_service
380  # and resource_service from operation_service.Get(operation) response.
381  # It is necessary because poll operation is returning only response, but we
382  # also need to get operation details to know the service to poll for all
383  # unprocessed_operations.
384  operation_details = {}
385  unprocessed_operations = []
386  for operation in operations_data:
387    operation_details[operation.operation.selfLink] = operation
388    unprocessed_operations.append(operation.operation)
389
390  start = time_util.CurrentTimeSec()
391  sleep_sec = 0
392  # There is only one type of operation in compute API.
393  # We pick the type of the first operation in the list.
394  operation_type = operations_data[0].operation_service.GetResponseType('Get')
395
396  while unprocessed_operations:
397    if progress_tracker:
398      progress_tracker.Tick()
399    resource_requests = []
400    operation_requests = []
401
402    log.debug('Operations to inspect: %s', unprocessed_operations)
403    for operation in unprocessed_operations:
404      # Reify operation
405      data = operation_details[operation.selfLink]
406      # Need to update the operation since old operation may not have all the
407      # required information.
408      data.SetOperation(operation)
409
410      operation_service = data.operation_service
411      resource_service = data.resource_service
412
413      if operation.status == operation_type.StatusValueValuesEnum.DONE:
414        # The operation has reached the DONE state, so we record any
415        # problems it contains (if any) and proceed to get the target
416        # resource if there were no problems and the operation is not
417        # a deletion.
418        _RecordProblems(operation, warnings, errors)
419
420        # We shouldn't attempt to get the target resource if there was
421        # anything wrong with the operation. Note that
422        # httpErrorStatusCode is set only when the operation is not
423        # successful.
424        if (operation.httpErrorStatusCode and
425            operation.httpErrorStatusCode != 200):  # httplib.OK
426          if data.always_return_operation:
427            yield operation
428          else:
429            continue
430
431        # Just in case the server did not set httpErrorStatusCode but
432        # the operation did fail, we check the "error" field.
433        if operation.error:
434          continue
435
436        # If the operation is done and we do not want to get the target resource
437        # but do want to print results from the operation, we return the
438        # operation.
439        if data.no_followup:
440          yield operation
441          continue
442
443        # We shouldn't get the target resource if the operation type
444        # is delete because there will be no resource left.
445        if not _IsDeleteOp(operation.operationType):
446          request = data.ResourceGetRequest()
447          # Some operations do not have target and should not send get request.
448          if request:
449            resource_requests.append((resource_service, 'Get', request))
450
451        # Only log when there is target link in the operation.
452        if operation.targetLink and log_result:
453          log.status.write('{0} [{1}].\n'.format(
454              _HumanFriendlyNameForOpPastTense(
455                  operation.operationType).capitalize(), operation.targetLink))
456
457      else:
458        # The operation has not reached the DONE state, so we add a request
459        # to poll the operation.
460        # TODO(b/129413862): Global org operation service supports wait API.
461        if data.IsGlobalOrganizationOperation():
462          request = data.OperationGetRequest()
463          operation_requests.append((operation_service, 'Get', request))
464        else:
465          request = data.OperationWaitRequest()
466          operation_requests.append((operation_service, 'Wait', request))
467
468    requests = resource_requests + operation_requests
469    if not requests:
470      break
471
472    responses, request_errors = batch_helper.MakeRequests(
473        requests=requests,
474        http=http,
475        batch_url=batch_url)
476
477    errors.extend(request_errors)
478
479    all_done = True
480    unprocessed_operations = []
481    for response in responses:
482      if isinstance(response, operation_type):
483        unprocessed_operations.append(response)
484        if response.status != operation_type.StatusValueValuesEnum.DONE:
485          all_done = False
486      else:
487        yield response
488
489    # If there are no more operations, we are done.
490    if not unprocessed_operations:
491      break
492
493    # If all of the operations are done, we should ignore the timeout and ignore
494    # the sleep.
495    if all_done:
496      continue
497
498    # Did we time out? If so, record the operations that timed out so
499    # they can be reported to the user.
500    if time_util.CurrentTimeSec() - start > timeout:
501      log.debug('Timeout of %ss reached.', timeout)
502      _RecordUnfinishedOperations(unprocessed_operations, errors)
503      break
504
505    # Sleeps before trying to poll the operations again.
506    sleep_sec = min(sleep_sec + 1, _MAX_TIME_BETWEEN_POLLS_SEC)
507    log.debug('Sleeping for %ss.', sleep_sec)
508    time_util.Sleep(sleep_sec)
509