1# -*- coding: utf-8 -*- # 2# Copyright 2018 Google LLC. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Common utility functions for Composer environment patch commands.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import unicode_literals 20 21from googlecloudsdk.api_lib.composer import environments_util as environments_api_util 22from googlecloudsdk.api_lib.composer import operations_util as operations_api_util 23from googlecloudsdk.api_lib.composer import util as api_util 24from googlecloudsdk.calliope import base 25from googlecloudsdk.command_lib.composer import util as command_util 26from googlecloudsdk.core import log 27import six 28 29 30def Patch(env_resource, 31 field_mask, 32 patch, 33 is_async, 34 release_track=base.ReleaseTrack.GA): 35 """Patches an Environment, optionally waiting for the operation to complete. 36 37 This function is intended to perform the common work of an Environment 38 patching command's Run method. That is, calling the patch API method and 39 waiting for the result or immediately returning the Operation. 40 41 Args: 42 env_resource: googlecloudsdk.core.resources.Resource, Resource representing 43 the Environment to be patched 44 field_mask: str, a field mask string containing comma-separated paths to be 45 patched 46 patch: Environment, a patch Environment containing updated values to apply 47 is_async: bool, whether or not to perform the patch asynchronously 48 release_track: base.ReleaseTrack, the release track of command. Will dictate 49 which Composer client library will be used. 50 51 Returns: 52 an Operation corresponding to the Patch call if `is_async` is True; 53 otherwise None is returned after the operation is complete 54 55 Raises: 56 command_util.Error: if `is_async` is False and the operation encounters 57 an error 58 """ 59 operation = environments_api_util.Patch( 60 env_resource, patch, field_mask, release_track=release_track) 61 details = 'with operation [{0}]'.format(operation.name) 62 if is_async: 63 log.UpdatedResource( 64 env_resource.RelativeName(), 65 kind='environment', 66 is_async=True, 67 details=details) 68 return operation 69 70 try: 71 operations_api_util.WaitForOperation( 72 operation, 73 'Waiting for [{}] to be updated with [{}]'.format( 74 env_resource.RelativeName(), operation.name), 75 release_track=release_track) 76 except command_util.Error as e: 77 raise command_util.Error('Error updating [{}]: {}'.format( 78 env_resource.RelativeName(), six.text_type(e))) 79 80 81def ConstructPatch(env_ref=None, 82 node_count=None, 83 update_pypi_packages_from_file=None, 84 clear_pypi_packages=None, 85 remove_pypi_packages=None, 86 update_pypi_packages=None, 87 clear_labels=None, 88 remove_labels=None, 89 update_labels=None, 90 clear_airflow_configs=None, 91 remove_airflow_configs=None, 92 update_airflow_configs=None, 93 clear_env_variables=None, 94 remove_env_variables=None, 95 update_env_variables=None, 96 update_image_version=None, 97 update_web_server_access_control=None, 98 cloud_sql_machine_type=None, 99 web_server_machine_type=None, 100 scheduler_cpu=None, 101 worker_cpu=None, 102 min_workers=None, 103 max_workers=None, 104 maintenance_window_start=None, 105 maintenance_window_end=None, 106 maintenance_window_recurrence=None, 107 release_track=base.ReleaseTrack.GA): 108 """Constructs an environment patch. 109 110 Args: 111 env_ref: resource argument, Environment resource argument for environment 112 being updated. 113 node_count: int, the desired node count 114 update_pypi_packages_from_file: str, path to local requirements file 115 containing desired pypi dependencies. 116 clear_pypi_packages: bool, whether to uninstall all PyPI packages. 117 remove_pypi_packages: iterable(string), Iterable of PyPI packages to 118 uninstall. 119 update_pypi_packages: {string: string}, dict mapping PyPI package name to 120 extras and version specifier. 121 clear_labels: bool, whether to clear the labels dictionary. 122 remove_labels: iterable(string), Iterable of label names to remove. 123 update_labels: {string: string}, dict of label names and values to set. 124 clear_airflow_configs: bool, whether to clear the Airflow configs 125 dictionary. 126 remove_airflow_configs: iterable(string), Iterable of Airflow config 127 property names to remove. 128 update_airflow_configs: {string: string}, dict of Airflow config property 129 names and values to set. 130 clear_env_variables: bool, whether to clear the environment variables 131 dictionary. 132 remove_env_variables: iterable(string), Iterable of environment variables 133 to remove. 134 update_env_variables: {string: string}, dict of environment variable 135 names and values to set. 136 update_image_version: string, image version to use for environment upgrade 137 update_web_server_access_control: [{string: string}], Webserver access 138 control to set 139 cloud_sql_machine_type: str or None, Cloud SQL machine type used by the 140 Airflow database. 141 web_server_machine_type: str or None, machine type used by the Airflow web 142 server 143 scheduler_cpu: float or None, CPU allocated to Airflow scheduler. 144 Can be specified only in Composer 2.0.0. 145 worker_cpu: float or None, CPU allocated to each Airflow worker. 146 Can be specified only in Composer 2.0.0. 147 min_workers: int or None, minimum number of workers 148 in the Environment. Can be specified only in Composer 2.0.0. 149 max_workers: int or None, maximumn number of workers 150 in the Environment. Can be specified only in Composer 2.0.0. 151 maintenance_window_start: Datetime or None, a starting date of the 152 maintenance window. 153 maintenance_window_end: Datetime or None, an ending date of the maintenance 154 window. 155 maintenance_window_recurrence: str or None, recurrence RRULE for the 156 maintenance window. 157 release_track: base.ReleaseTrack, the release track of command. Will dictate 158 which Composer client library will be used. 159 160 Returns: 161 (str, Environment), the field mask and environment to use for update. 162 163 Raises: 164 command_util.Error: if no update type is specified 165 """ 166 if node_count: 167 return _ConstructNodeCountPatch(node_count, release_track=release_track) 168 if update_pypi_packages_from_file: 169 return _ConstructPyPiPackagesPatch( 170 True, [], 171 command_util.ParseRequirementsFile(update_pypi_packages_from_file), 172 release_track=release_track) 173 if clear_pypi_packages or remove_pypi_packages or update_pypi_packages: 174 return _ConstructPyPiPackagesPatch( 175 clear_pypi_packages, 176 remove_pypi_packages, 177 update_pypi_packages, 178 release_track=release_track) 179 if clear_labels or remove_labels or update_labels: 180 return _ConstructLabelsPatch( 181 clear_labels, remove_labels, update_labels, release_track=release_track) 182 if (clear_airflow_configs or remove_airflow_configs or 183 update_airflow_configs): 184 return _ConstructAirflowConfigsPatch( 185 clear_airflow_configs, 186 remove_airflow_configs, 187 update_airflow_configs, 188 release_track=release_track) 189 if clear_env_variables or remove_env_variables or update_env_variables: 190 return _ConstructEnvVariablesPatch( 191 env_ref, 192 clear_env_variables, 193 remove_env_variables, 194 update_env_variables, 195 release_track=release_track) 196 if update_image_version: 197 return _ConstructImageVersionPatch( 198 update_image_version, release_track=release_track) 199 if update_web_server_access_control is not None: 200 return _ConstructWebServerAccessControlPatch( 201 update_web_server_access_control, release_track=release_track) 202 if cloud_sql_machine_type: 203 return _ConstructCloudSqlMachineTypePatch( 204 cloud_sql_machine_type, release_track=release_track) 205 if web_server_machine_type: 206 return _ConstructWebServerMachineTypePatch( 207 web_server_machine_type, release_track=release_track) 208 if scheduler_cpu or worker_cpu or\ 209 min_workers or max_workers: 210 return _ConstructAutoscalingPatch( 211 scheduler_cpu=scheduler_cpu, 212 worker_cpu=worker_cpu, 213 worker_min_count=min_workers, 214 worker_max_count=max_workers, 215 release_track=release_track) 216 if maintenance_window_start and maintenance_window_end and maintenance_window_recurrence: 217 return _ConstructMaintenanceWindowPatch( 218 maintenance_window_start, 219 maintenance_window_end, 220 maintenance_window_recurrence, 221 release_track=release_track) 222 raise command_util.Error( 223 'Cannot update Environment with no update type specified.') 224 225 226def _ConstructNodeCountPatch(node_count, release_track=base.ReleaseTrack.GA): 227 """Constructs an environment patch for node count. 228 229 Args: 230 node_count: int, the desired node count 231 release_track: base.ReleaseTrack, the release track of command. Will dictate 232 which Composer client library will be used. 233 234 Returns: 235 (str, Environment), the field mask and environment to use for update. 236 """ 237 messages = api_util.GetMessagesModule(release_track=release_track) 238 config = messages.EnvironmentConfig(nodeCount=node_count) 239 return 'config.node_count', messages.Environment(config=config) 240 241 242def _ConstructPyPiPackagesPatch(clear_pypi_packages, 243 remove_pypi_packages, 244 update_pypi_packages, 245 release_track=base.ReleaseTrack.GA): 246 """Constructs an environment patch for partially updating PyPI packages. 247 248 Args: 249 clear_pypi_packages: bool, whether to clear the PyPI packages dictionary. 250 remove_pypi_packages: iterable(string), Iterable of PyPI package names to 251 remove. 252 update_pypi_packages: {string: string}, dict mapping PyPI package name 253 to optional extras and version specifier. 254 release_track: base.ReleaseTrack, the release track of command. Will dictate 255 which Composer client library will be used. 256 257 Returns: 258 (str, Environment), the field mask and environment to use for update. 259 """ 260 messages = api_util.GetMessagesModule(release_track=release_track) 261 env_cls = messages.Environment 262 pypi_packages_cls = (messages.SoftwareConfig.PypiPackagesValue) 263 entry_cls = pypi_packages_cls.AdditionalProperty 264 265 def _BuildEnv(entries): 266 software_config = messages.SoftwareConfig( 267 pypiPackages=pypi_packages_cls(additionalProperties=entries)) 268 config = messages.EnvironmentConfig(softwareConfig=software_config) 269 return env_cls(config=config) 270 271 return command_util.BuildPartialUpdate( 272 clear_pypi_packages, remove_pypi_packages, update_pypi_packages, 273 'config.software_config.pypi_packages', entry_cls, _BuildEnv) 274 275 276def _ConstructLabelsPatch(clear_labels, 277 remove_labels, 278 update_labels, 279 release_track=base.ReleaseTrack.GA): 280 """Constructs an environment patch for updating labels. 281 282 Args: 283 clear_labels: bool, whether to clear the labels dictionary. 284 remove_labels: iterable(string), Iterable of label names to remove. 285 update_labels: {string: string}, dict of label names and values to set. 286 release_track: base.ReleaseTrack, the release track of command. Will dictate 287 which Composer client library will be used. 288 289 Returns: 290 (str, Environment), the field mask and environment to use for update. 291 """ 292 messages = api_util.GetMessagesModule(release_track=release_track) 293 env_cls = messages.Environment 294 entry_cls = env_cls.LabelsValue.AdditionalProperty 295 296 def _BuildEnv(entries): 297 return env_cls(labels=env_cls.LabelsValue(additionalProperties=entries)) 298 299 return command_util.BuildPartialUpdate(clear_labels, remove_labels, 300 update_labels, 'labels', entry_cls, 301 _BuildEnv) 302 303 304def _ConstructAirflowConfigsPatch(clear_airflow_configs, 305 remove_airflow_configs, 306 update_airflow_configs, 307 release_track=base.ReleaseTrack.GA): 308 """Constructs an environment patch for updating Airflow configs. 309 310 Args: 311 clear_airflow_configs: bool, whether to clear the Airflow configs 312 dictionary. 313 remove_airflow_configs: iterable(string), Iterable of Airflow config 314 property names to remove. 315 update_airflow_configs: {string: string}, dict of Airflow config property 316 names and values to set. 317 release_track: base.ReleaseTrack, the release track of command. Will dictate 318 which Composer client library will be used. 319 320 Returns: 321 (str, Environment), the field mask and environment to use for update. 322 """ 323 messages = api_util.GetMessagesModule(release_track=release_track) 324 env_cls = messages.Environment 325 airflow_config_overrides_cls = ( 326 messages.SoftwareConfig.AirflowConfigOverridesValue) 327 entry_cls = airflow_config_overrides_cls.AdditionalProperty 328 329 def _BuildEnv(entries): 330 software_config = messages.SoftwareConfig( 331 airflowConfigOverrides=airflow_config_overrides_cls( 332 additionalProperties=entries)) 333 config = messages.EnvironmentConfig(softwareConfig=software_config) 334 return env_cls(config=config) 335 336 return command_util.BuildPartialUpdate( 337 clear_airflow_configs, remove_airflow_configs, update_airflow_configs, 338 'config.software_config.airflow_config_overrides', entry_cls, _BuildEnv) 339 340 341def _ConstructEnvVariablesPatch(env_ref, 342 clear_env_variables, 343 remove_env_variables, 344 update_env_variables, 345 release_track=base.ReleaseTrack.GA): 346 """Constructs an environment patch for updating environment variables. 347 348 Note that environment variable updates do not support partial update masks 349 unlike other map updates due to comments in (b/78298321). For this reason, we 350 need to retrieve the Environment, apply an update on EnvVariable dictionary, 351 and patch the entire dictionary. The potential race condition here 352 (environment variables being updated between when we retrieve them and when we 353 send patch request)is not a concern since environment variable updates take 354 5 mins to complete, and environments cannot be updated while already in the 355 updating state. 356 357 Args: 358 env_ref: resource argument, Environment resource argument for environment 359 being updated. 360 clear_env_variables: bool, whether to clear the environment variables 361 dictionary. 362 remove_env_variables: iterable(string), Iterable of environment variable 363 names to remove. 364 update_env_variables: {string: string}, dict of environment variable names 365 and values to set. 366 release_track: base.ReleaseTrack, the release track of command. Will dictate 367 which Composer client library will be used. 368 369 Returns: 370 (str, Environment), the field mask and environment to use for update. 371 """ 372 env_obj = environments_api_util.Get(env_ref, release_track=release_track) 373 initial_env_var_value = env_obj.config.softwareConfig.envVariables 374 initial_env_var_list = ( 375 initial_env_var_value.additionalProperties 376 if initial_env_var_value else []) 377 378 messages = api_util.GetMessagesModule(release_track=release_track) 379 env_cls = messages.Environment 380 env_variables_cls = messages.SoftwareConfig.EnvVariablesValue 381 entry_cls = env_variables_cls.AdditionalProperty 382 383 def _BuildEnv(entries): 384 software_config = messages.SoftwareConfig( 385 envVariables=env_variables_cls(additionalProperties=entries)) 386 config = messages.EnvironmentConfig(softwareConfig=software_config) 387 return env_cls(config=config) 388 389 return ('config.software_config.env_variables', 390 command_util.BuildFullMapUpdate( 391 clear_env_variables, remove_env_variables, update_env_variables, 392 initial_env_var_list, entry_cls, _BuildEnv)) 393 394 395def _ConstructImageVersionPatch(update_image_version, 396 release_track=base.ReleaseTrack.GA): 397 """Constructs an environment patch for environment image version. 398 399 Args: 400 update_image_version: string, the target image version. 401 release_track: base.ReleaseTrack, the release track of command. Will dictate 402 which Composer client library will be used. 403 404 Returns: 405 (str, Environment), the field mask and environment to use for update. 406 """ 407 messages = api_util.GetMessagesModule(release_track=release_track) 408 software_config = messages.SoftwareConfig(imageVersion=update_image_version) 409 config = messages.EnvironmentConfig(softwareConfig=software_config) 410 411 return 'config.software_config.image_version', messages.Environment( 412 config=config) 413 414 415def _ConstructWebServerAccessControlPatch(web_server_access_control, 416 release_track): 417 """Constructs an environment patch for web server network access control. 418 419 Args: 420 web_server_access_control: [{string: string}], the target list of IP ranges. 421 release_track: base.ReleaseTrack, the release track of command. It dictates 422 which Composer client library is used. 423 424 Returns: 425 (str, Environment), the field mask and environment to use for update. 426 """ 427 messages = api_util.GetMessagesModule(release_track=release_track) 428 config = messages.EnvironmentConfig( 429 webServerNetworkAccessControl=environments_api_util 430 .BuildWebServerNetworkAccessControl(web_server_access_control, 431 release_track)) 432 return 'config.web_server_network_access_control', messages.Environment( 433 config=config) 434 435 436def _ConstructCloudSqlMachineTypePatch(cloud_sql_machine_type, release_track): 437 """Constructs an environment patch for Cloud SQL machine type. 438 439 Args: 440 cloud_sql_machine_type: str or None, Cloud SQL machine type used by the 441 Airflow database. 442 release_track: base.ReleaseTrack, the release track of command. It dictates 443 which Composer client library is used. 444 445 Returns: 446 (str, Environment), the field mask and environment to use for update. 447 """ 448 messages = api_util.GetMessagesModule(release_track=release_track) 449 config = messages.EnvironmentConfig( 450 databaseConfig=messages.DatabaseConfig( 451 machineType=cloud_sql_machine_type)) 452 return 'config.database_config.machine_type', messages.Environment( 453 config=config) 454 455 456def _ConstructWebServerMachineTypePatch(web_server_machine_type, release_track): 457 """Constructs an environment patch for Airflow web server machine type. 458 459 Args: 460 web_server_machine_type: str or None, machine type used by the Airflow web 461 server. 462 release_track: base.ReleaseTrack, the release track of command. It dictates 463 which Composer client library is used. 464 465 Returns: 466 (str, Environment), the field mask and environment to use for update. 467 """ 468 messages = api_util.GetMessagesModule(release_track=release_track) 469 config = messages.EnvironmentConfig( 470 webServerConfig=messages.WebServerConfig( 471 machineType=web_server_machine_type)) 472 return 'config.web_server_config.machine_type', messages.Environment( 473 config=config) 474 475 476def _ConstructAutoscalingPatch(scheduler_cpu, worker_cpu, worker_min_count, 477 worker_max_count, release_track): 478 """Constructs an environment patch for Airflow web server machine type. 479 480 Args: 481 scheduler_cpu: float or None, CPU allocated to Airflow scheduler. 482 Can be specified only in Composer 2.0.0. 483 worker_cpu: float or None, CPU allocated to each Airflow worker. 484 Can be specified only in Composer 2.0.0. 485 worker_min_count: int or None, minimum number of workers 486 in the Environment. Can be specified only in Composer 2.0.0. 487 worker_max_count: int or None, maximumn number of workers 488 in the Environment. Can be specified only in Composer 2.0.0. 489 release_track: base.ReleaseTrack, the release track of command. It dictates 490 which Composer client library is used. 491 492 Returns: 493 (str, Environment), the field mask and environment to use for update. 494 """ 495 messages = api_util.GetMessagesModule(release_track=release_track) 496 config = messages.EnvironmentConfig( 497 workloadsConfig=messages.WorkloadsConfig( 498 schedulerCpu=scheduler_cpu, 499 workerCpu=worker_cpu, 500 workerMinCount=worker_min_count, 501 workerMaxCount=worker_max_count)) 502 mask = '' 503 if scheduler_cpu: 504 mask = mask + 'config.workloads_config.scheduler_cpu' 505 if worker_cpu: 506 if mask: 507 mask = mask + ',' 508 mask = mask + 'config.workloads_config.worker_cpu' 509 if worker_min_count: 510 if mask: 511 mask = mask + ',' 512 mask = mask + 'config.workloads_config.worker_min_count' 513 if worker_max_count: 514 if mask: 515 mask = mask + ',' 516 mask = mask + 'config.workloads_config.worker_max_count' 517 if scheduler_cpu and worker_cpu and worker_min_count and worker_max_count: 518 mask = 'config.workloads_config' 519 return mask, messages.Environment(config=config) 520 521 522def _ConstructMaintenanceWindowPatch(maintenance_window_start, 523 maintenance_window_end, 524 maintenance_window_recurrence, 525 release_track=base.ReleaseTrack.GA): 526 """Constructs an environment patch for updating maintenance window. 527 528 Args: 529 maintenance_window_start: Datetime or None, a starting date of the 530 maintenance window. 531 maintenance_window_end: Datetime or None, an ending date of the maintenance 532 window. 533 maintenance_window_recurrence: str or None, recurrence RRULE for the 534 maintenance window. 535 release_track: base.ReleaseTrack, the release track of command. Will dictate 536 which Composer client library will be used. 537 538 Returns: 539 (str, Environment), the field mask and environment to use for update. 540 """ 541 messages = api_util.GetMessagesModule(release_track=release_track) 542 543 window_value = messages.MaintenanceWindow( 544 startTime=maintenance_window_start.isoformat(), 545 endTime=maintenance_window_end.isoformat(), 546 recurrence=maintenance_window_recurrence) 547 config = messages.EnvironmentConfig(maintenanceWindow=window_value) 548 549 return 'config.maintenance_window', messages.Environment(config=config) 550