1# -*- coding: utf-8 -*- # 2# Copyright 2019 Google LLC. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Utilities for waiting on Compute Engine operations.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import unicode_literals 20 21from apitools.base.py import exceptions as apitools_exceptions 22 23from googlecloudsdk.api_lib.compute import batch_helper 24from googlecloudsdk.api_lib.compute import path_simplifier 25from googlecloudsdk.api_lib.util import exceptions as http_exceptions 26from googlecloudsdk.command_lib.util import time_util 27from googlecloudsdk.core import log 28 29_POLLING_TIMEOUT_SEC = 60 * 30 30_MAX_TIME_BETWEEN_POLLS_SEC = 5 31 32# The set of possible operation types is {insert, delete, update, 33# *.insert, *.delete, *.update} + all verbs. For example, 34# 35# verb op type 36# Instances.setMedata setMetdata 37# Instances.insert insert 38# InsetanceTempalte.delete compute.instanceTemplates.delete 39# In our status reporting, we use the following 40# mapping. Anything not in the map is reported as "Updated". 41_HUMAN_FRIENDLY_OPERATION_TYPE_SUFFIXES = { 42 'createSnapshot': { 43 'past': 'created', 44 'present': 'create' 45 }, 46 'recreateInstancesInstanceGroupManager': { 47 'past': 'recreated', 48 'present': 'recreate' 49 }, 50 'createFirewallSecurityPolicy': { 51 'past': 'created', 52 'present': 'create' 53 }, 54 'deleteFirewallSecurityPolicy': { 55 'past': 'deleted', 56 'present': 'delete' 57 }, 58 'createPSCServiceEndpoint': { 59 'past': 'created', 60 'present': 'create' 61 }, 62 'deletePscForwardingRule': { 63 'past': 'deleted', 64 'present': 'delete' 65 }, 66 'createRegionPscForwardingRule': { 67 'past': 'created', 68 'present': 'create' 69 }, 70 'deleteRegionPscForwardingRule': { 71 'past': 'deleted', 72 'present': 'delete' 73 }, 74 'insert': { 75 'past': 'created', 76 'present': 'create' 77 }, 78 'delete': { 79 'past': 'deleted', 80 'present': 'delete' 81 }, 82 'update': { 83 'past': 'updated', 84 'present': 'update' 85 }, 86 'invalidateCache': { 87 'past': 'completed invalidation for', 88 'present': 'complete invalidation for' 89 } 90} 91 92 93def _HumanFriendlyNamesForOp(op_type): 94 for s in _HUMAN_FRIENDLY_OPERATION_TYPE_SUFFIXES: 95 if op_type.endswith(s): 96 return _HUMAN_FRIENDLY_OPERATION_TYPE_SUFFIXES.get(s) 97 98 return {'past': 'updated', 'present': 'update'} 99 100 101def _HumanFriendlyNameForOpPastTense(op_type): 102 return _HumanFriendlyNamesForOp(op_type)['past'] 103 104 105def _HumanFriendlyNameForOpPresentTense(op_type): 106 return _HumanFriendlyNamesForOp(op_type)['present'] 107 108 109def _IsDeleteOp(op_type): 110 return _HumanFriendlyNameForOpPastTense(op_type) == 'deleted' 111 112 113def _RecordProblems(operation, warnings, errors): 114 """Records any warnings and errors into the given lists.""" 115 for warning in operation.warnings or []: 116 warnings.append(warning.message) 117 if operation.error: 118 for error in operation.error.errors or []: 119 errors.append((operation.httpErrorStatusCode, error.message)) 120 121 122def _RecordUnfinishedOperations(operations, errors): 123 """Adds error messages stating that the given operations timed out.""" 124 pending_resources = [operation.targetLink for operation in operations] 125 errors.append( 126 (None, ('Did not {action} the following resources within ' 127 '{timeout}s: {links}. These operations may still be ' 128 'underway remotely and may still succeed; use gcloud list ' 129 'and describe commands or ' 130 'https://console.developers.google.com/ to ' 131 'check resource state').format( 132 action=_HumanFriendlyNameForOpPresentTense( 133 operations[0].operationType), 134 timeout=_POLLING_TIMEOUT_SEC, 135 links=', '.join(pending_resources)))) 136 137 138class OperationData(object): 139 """Holds all information necessary to poll given operation. 140 141 Attributes: 142 operation: An Operation object to poll. 143 operation_service: The service that can be used to get operation 144 object. 145 resource_service: The service of the collection being mutated by 146 the operation. If the operation type is not delete, this service 147 is used to fetch the mutated object after the operation is done. 148 project: str, The project to which the resource belong. 149 no_followup: str, If True, do not send followup GET request. 150 followup_override: str, Overrides the target resource name when 151 it is different from the resource name which is used to poll. 152 always_return_operation: If true, always return operation object even if 153 the operation fails. 154 errors: An output parameter for capturing errors. 155 warnings: An output parameter for capturing warnings. 156 """ 157 158 def __init__(self, 159 operation, 160 operation_service, 161 resource_service, 162 project=None, 163 no_followup=False, 164 followup_override=None, 165 always_return_operation=False): 166 self.operation = operation 167 self.operation_service = operation_service 168 self.resource_service = resource_service 169 self.project = project 170 self.no_followup = no_followup 171 self.followup_override = followup_override 172 self.always_return_operation = always_return_operation 173 174 self.errors = [] 175 self.warnings = [] 176 177 def __eq__(self, o): 178 if not isinstance(o, OperationData): 179 return False 180 return (self.operation == o.operation and self.project == o.project and 181 self.operation_service == o.operation_service and 182 self.resource_service == o.resource_service and 183 self.no_followup == o.no_followup and 184 self.followup_override == o.followup_override) 185 186 def __hash__(self): 187 return (hash(self.operation.selfLink) ^ hash(self.project) 188 ^ hash(self.operation_service) ^ hash(self.resource_service) 189 ^ hash(self.no_followup) ^ hash(self.followup_override)) 190 191 def __ne__(self, o): 192 return not self == o 193 194 def SetOperation(self, operation): 195 """"Updates the operation. 196 197 Args: 198 operation: Operation to be assigned. 199 """ 200 self.operation = operation 201 202 def IsGlobalOrganizationOperation(self): 203 if not hasattr(self.operation_service.client, 204 'globalOrganizationOperations'): 205 return False 206 return (self.operation_service == 207 self.operation_service.client.globalOrganizationOperations) 208 209 def IsDone(self): 210 """Returns true if the operation is done.""" 211 operation_type = self.operation_service.GetResponseType('Get') 212 done = operation_type.StatusValueValuesEnum.DONE 213 return self.operation.status == done 214 215 def _SupportOperationWait(self): 216 return 'Wait' in self.operation_service.GetMethodsList() 217 218 def ResourceGetRequest(self): 219 """"Generates apitools request message to get the resource.""" 220 221 target_link = self.operation.targetLink 222 223 if self.project: 224 request = self.resource_service.GetRequestType('Get')( 225 project=self.project) 226 else: 227 # Gets the flexible resource ID. 228 if target_link is None: 229 log.status.write('{0}.\n'.format( 230 _HumanFriendlyNameForOpPastTense( 231 self.operation.operationType).capitalize())) 232 return 233 token_list = target_link.split('/') 234 flexible_resource_id = token_list[-1] 235 request = self.resource_service.GetRequestType('Get')( 236 securityPolicy=flexible_resource_id) 237 if self.operation.zone: 238 request.zone = path_simplifier.Name(self.operation.zone) 239 elif self.operation.region: 240 request.region = path_simplifier.Name(self.operation.region) 241 name_field = self.resource_service.GetMethodConfig('Get').ordered_params[-1] 242 243 resource_name = self.followup_override or path_simplifier.Name( 244 self.operation.targetLink) 245 246 setattr(request, name_field, resource_name) 247 return request 248 249 def _OperationRequest(self, verb): 250 """Generates apitools request message to poll the operation.""" 251 252 if self.project: 253 request = self.operation_service.GetRequestType(verb)( 254 operation=self.operation.name, project=self.project) 255 else: 256 # Fetches the parent ID from the operation name. 257 token_list = self.operation.name.split('-') 258 parent_id = 'organizations/' + token_list[1] 259 request = self.operation_service.GetRequestType(verb)( 260 operation=self.operation.name, parentId=parent_id) 261 if self.operation.zone: 262 request.zone = path_simplifier.Name(self.operation.zone) 263 elif self.operation.region: 264 request.region = path_simplifier.Name(self.operation.region) 265 return request 266 267 def OperationGetRequest(self): 268 """Generates apitools request message for operations.get method.""" 269 return self._OperationRequest('Get') 270 271 def OperationWaitRequest(self): 272 """Generates apitools request message for operations.wait method.""" 273 return self._OperationRequest('Wait') 274 275 def _CallService(self, method, request): 276 try: 277 return method(request) 278 except apitools_exceptions.HttpError as e: 279 http_err = http_exceptions.HttpException(e) 280 self.errors.append((http_err.error.status_code, http_err.message)) 281 _RecordProblems(self.operation, self.warnings, self.errors) 282 raise 283 284 def _PollUntilDoneUsingOperationGet(self, timeout_sec=_POLLING_TIMEOUT_SEC): 285 """Polls the operation with operation Get method.""" 286 get_request = self.OperationGetRequest() 287 start = time_util.CurrentTimeSec() 288 poll_time_interval = 0 289 max_poll_interval = 5 # 5 seconds 290 291 while True: 292 if time_util.CurrentTimeSec() - start > timeout_sec: 293 self.errors.append( 294 (None, 'operation {} timed out'.format(self.operation.name))) 295 _RecordProblems(self.operation, self.warnings, self.errors) 296 return 297 298 try: 299 self.operation = self._CallService(self.operation_service.Get, 300 get_request) 301 except apitools_exceptions.HttpError: 302 return 303 304 if self.IsDone(): 305 _RecordProblems(self.operation, self.warnings, self.errors) 306 return 307 poll_time_interval = min(poll_time_interval + 1, max_poll_interval) 308 time_util.Sleep(poll_time_interval) 309 310 def _PollUntilDoneUsingOperationWait(self, timeout_sec=_POLLING_TIMEOUT_SEC): 311 """Polls the operation with operation method.""" 312 wait_request = self.OperationWaitRequest() 313 start = time_util.CurrentTimeSec() 314 315 while not self.IsDone(): 316 if time_util.CurrentTimeSec() - start > timeout_sec: 317 self.errors.append( 318 (None, 'operation {} timed out'.format(self.operation.name))) 319 _RecordProblems(self.operation, self.warnings, self.errors) 320 return 321 try: 322 self.operation = self._CallService(self.operation_service.Wait, 323 wait_request) 324 except apitools_exceptions.HttpError: 325 return 326 327 _RecordProblems(self.operation, self.warnings, self.errors) 328 329 def PollUntilDone(self, timeout_sec=_POLLING_TIMEOUT_SEC): 330 """Polls the operation until it is done.""" 331 if self.IsDone(): 332 return 333 334 if self._SupportOperationWait(): 335 self._PollUntilDoneUsingOperationWait(timeout_sec) 336 else: 337 self._PollUntilDoneUsingOperationGet(timeout_sec) 338 339 def GetResult(self, timeout_sec=_POLLING_TIMEOUT_SEC): 340 """Get the resource which is touched by the operation.""" 341 self.PollUntilDone(timeout_sec) 342 if not self.no_followup and not self.operation.error and not _IsDeleteOp( 343 self.operation.operationType): 344 resource_get_request = self.ResourceGetRequest() 345 try: 346 return self._CallService(self.resource_service.Get, 347 resource_get_request) 348 except apitools_exceptions.HttpError: 349 pass 350 351 352def WaitForOperations( 353 operations_data, http, batch_url, warnings, errors, 354 progress_tracker=None, timeout=None, log_result=True): 355 """Blocks until the given operations are done or until a timeout is reached. 356 357 Args: 358 operations_data: A list of OperationData objects holding Operations to poll. 359 http: An HTTP object. 360 batch_url: The URL to which batch requests should be sent. 361 warnings: An output parameter for capturing warnings. 362 errors: An output parameter for capturing errors. 363 progress_tracker: progress tracker to tick while waiting for operations to 364 finish. 365 timeout: The maximum amount of time, in seconds, to wait for the 366 operations to reach the DONE state. 367 log_result: Whether the Operation Waiter should print the result in past 368 tense of each request. 369 370 Yields: 371 The resources pointed to by the operations' targetLink fields if 372 the operation type is not delete. Only resources whose 373 corresponding operations reach done are yielded. 374 """ 375 if not operations_data: 376 return 377 timeout = timeout or _POLLING_TIMEOUT_SEC 378 379 # Operation -> OperationData mapping will be used to reify operation_service 380 # and resource_service from operation_service.Get(operation) response. 381 # It is necessary because poll operation is returning only response, but we 382 # also need to get operation details to know the service to poll for all 383 # unprocessed_operations. 384 operation_details = {} 385 unprocessed_operations = [] 386 for operation in operations_data: 387 operation_details[operation.operation.selfLink] = operation 388 unprocessed_operations.append(operation.operation) 389 390 start = time_util.CurrentTimeSec() 391 sleep_sec = 0 392 # There is only one type of operation in compute API. 393 # We pick the type of the first operation in the list. 394 operation_type = operations_data[0].operation_service.GetResponseType('Get') 395 396 while unprocessed_operations: 397 if progress_tracker: 398 progress_tracker.Tick() 399 resource_requests = [] 400 operation_requests = [] 401 402 log.debug('Operations to inspect: %s', unprocessed_operations) 403 for operation in unprocessed_operations: 404 # Reify operation 405 data = operation_details[operation.selfLink] 406 # Need to update the operation since old operation may not have all the 407 # required information. 408 data.SetOperation(operation) 409 410 operation_service = data.operation_service 411 resource_service = data.resource_service 412 413 if operation.status == operation_type.StatusValueValuesEnum.DONE: 414 # The operation has reached the DONE state, so we record any 415 # problems it contains (if any) and proceed to get the target 416 # resource if there were no problems and the operation is not 417 # a deletion. 418 _RecordProblems(operation, warnings, errors) 419 420 # We shouldn't attempt to get the target resource if there was 421 # anything wrong with the operation. Note that 422 # httpErrorStatusCode is set only when the operation is not 423 # successful. 424 if (operation.httpErrorStatusCode and 425 operation.httpErrorStatusCode != 200): # httplib.OK 426 if data.always_return_operation: 427 yield operation 428 else: 429 continue 430 431 # Just in case the server did not set httpErrorStatusCode but 432 # the operation did fail, we check the "error" field. 433 if operation.error: 434 continue 435 436 # If the operation is done and we do not want to get the target resource 437 # but do want to print results from the operation, we return the 438 # operation. 439 if data.no_followup: 440 yield operation 441 continue 442 443 # We shouldn't get the target resource if the operation type 444 # is delete because there will be no resource left. 445 if not _IsDeleteOp(operation.operationType): 446 request = data.ResourceGetRequest() 447 # Some operations do not have target and should not send get request. 448 if request: 449 resource_requests.append((resource_service, 'Get', request)) 450 451 # Only log when there is target link in the operation. 452 if operation.targetLink and log_result: 453 log.status.write('{0} [{1}].\n'.format( 454 _HumanFriendlyNameForOpPastTense( 455 operation.operationType).capitalize(), operation.targetLink)) 456 457 else: 458 # The operation has not reached the DONE state, so we add a request 459 # to poll the operation. 460 # TODO(b/129413862): Global org operation service supports wait API. 461 if data.IsGlobalOrganizationOperation(): 462 request = data.OperationGetRequest() 463 operation_requests.append((operation_service, 'Get', request)) 464 else: 465 request = data.OperationWaitRequest() 466 operation_requests.append((operation_service, 'Wait', request)) 467 468 requests = resource_requests + operation_requests 469 if not requests: 470 break 471 472 responses, request_errors = batch_helper.MakeRequests( 473 requests=requests, 474 http=http, 475 batch_url=batch_url) 476 477 errors.extend(request_errors) 478 479 all_done = True 480 unprocessed_operations = [] 481 for response in responses: 482 if isinstance(response, operation_type): 483 unprocessed_operations.append(response) 484 if response.status != operation_type.StatusValueValuesEnum.DONE: 485 all_done = False 486 else: 487 yield response 488 489 # If there are no more operations, we are done. 490 if not unprocessed_operations: 491 break 492 493 # If all of the operations are done, we should ignore the timeout and ignore 494 # the sleep. 495 if all_done: 496 continue 497 498 # Did we time out? If so, record the operations that timed out so 499 # they can be reported to the user. 500 if time_util.CurrentTimeSec() - start > timeout: 501 log.debug('Timeout of %ss reached.', timeout) 502 _RecordUnfinishedOperations(unprocessed_operations, errors) 503 break 504 505 # Sleeps before trying to poll the operations again. 506 sleep_sec = min(sleep_sec + 1, _MAX_TIME_BETWEEN_POLLS_SEC) 507 log.debug('Sleeping for %ss.', sleep_sec) 508 time_util.Sleep(sleep_sec) 509