1# -*- coding: utf-8 -*- # 2# Copyright 2015 Google LLC. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""Update cluster command.""" 17 18from __future__ import absolute_import 19from __future__ import division 20from __future__ import unicode_literals 21 22from googlecloudsdk.api_lib.dataproc import dataproc as dp 23from googlecloudsdk.api_lib.dataproc import exceptions 24from googlecloudsdk.api_lib.dataproc import util 25from googlecloudsdk.calliope import actions 26from googlecloudsdk.calliope import arg_parsers 27from googlecloudsdk.calliope import base 28from googlecloudsdk.command_lib.dataproc import flags 29from googlecloudsdk.command_lib.util.args import labels_util 30from googlecloudsdk.core import log 31from googlecloudsdk.core.util import times 32import six 33 34 35class Update(base.UpdateCommand): 36 """Update labels and/or the number of worker nodes in a cluster. 37 38 Update the number of worker nodes and/or the labels in a cluster. 39 40 ## EXAMPLES 41 42 To resize a cluster, run: 43 44 $ {command} my_cluster --region=us-central1 --num-workers=5 45 46 To change the number preemptible workers in a cluster, run: 47 48 $ {command} my_cluster --region=us-central1 --num-preemptible-workers=5 49 50 To add the label 'customer=acme' to a cluster, run: 51 52 $ {command} my_cluster --region=us-central1 --update-labels=customer=acme 53 54 To update the label 'customer=ackme' to 'customer=acme', run: 55 56 $ {command} my_cluster --region=us-central1 --update-labels=customer=acme 57 58 To remove the label whose key is 'customer', run: 59 60 $ {command} my_cluster --region=us-central1 --remove-labels=customer 61 62 """ 63 64 @classmethod 65 def Args(cls, parser): 66 dataproc = dp.Dataproc(cls.ReleaseTrack()) 67 base.ASYNC_FLAG.AddToParser(parser) 68 # Allow the user to specify new labels as well as update/remove existing 69 labels_util.AddUpdateLabelsFlags(parser) 70 # Graceful decomissioning timeouts can be up to 24 hours + add 1 hour for 71 # deleting VMs, etc. 72 flags.AddTimeoutFlag(parser, default='25h') 73 flags.AddClusterResourceArg(parser, 'update', dataproc.api_version) 74 parser.add_argument( 75 '--num-workers', 76 type=int, 77 help='The new number of worker nodes in the cluster.') 78 num_secondary_workers = parser.add_argument_group(mutex=True) 79 num_secondary_workers.add_argument( 80 '--num-preemptible-workers', 81 action=actions.DeprecationAction( 82 '--num-preemptible-workers', 83 warn=('The `--num-preemptible-workers` flag is deprecated. ' 84 'Use the `--num-secondary-workers` flag instead.')), 85 type=int, 86 hidden=True, 87 help='The new number of preemptible worker nodes in the cluster.') 88 num_secondary_workers.add_argument( 89 '--num-secondary-workers', 90 type=int, 91 help='The new number of secondary worker nodes in the cluster.') 92 93 parser.add_argument( 94 '--graceful-decommission-timeout', 95 type=arg_parsers.Duration(lower_bound='0s', upper_bound='1d'), 96 help=""" 97 The graceful decommission timeout for decommissioning Node Managers 98 in the cluster, used when removing nodes. Graceful decommissioning 99 allows removing nodes from the cluster without interrupting jobs in 100 progress. Timeout specifies how long to wait for jobs in progress to 101 finish before forcefully removing nodes (and potentially 102 interrupting jobs). Timeout defaults to 0 if not set (for forceful 103 decommission), and the maximum allowed timeout is 1 day. 104 See $ gcloud topic datetimes for information on duration formats. 105 """) 106 107 idle_delete_group = parser.add_mutually_exclusive_group() 108 idle_delete_group.add_argument( 109 '--max-idle', 110 type=arg_parsers.Duration(), 111 help="""\ 112 The duration before cluster is auto-deleted after last job finished, 113 such as "2h" or "1d". 114 See $ gcloud topic datetimes for information on duration formats. 115 """) 116 idle_delete_group.add_argument( 117 '--no-max-idle', 118 action='store_true', 119 help="""\ 120 Cancels the cluster auto-deletion by cluster idle duration (configured 121 by --max-idle flag) 122 """) 123 124 auto_delete_group = parser.add_mutually_exclusive_group() 125 auto_delete_group.add_argument( 126 '--max-age', 127 type=arg_parsers.Duration(), 128 help="""\ 129 The lifespan of the cluster before it is auto-deleted, such as 130 "2h" or "1d". 131 See $ gcloud topic datetimes for information on duration formats. 132 """) 133 auto_delete_group.add_argument( 134 '--expiration-time', 135 type=arg_parsers.Datetime.Parse, 136 help="""\ 137 The time when cluster will be auto-deleted, such as 138 "2017-08-29T18:52:51.142Z". See $ gcloud topic datetimes for 139 information on time formats. 140 """) 141 auto_delete_group.add_argument( 142 '--no-max-age', 143 action='store_true', 144 help="""\ 145 Cancels the cluster auto-deletion by maximum cluster age (configured by 146 --max-age or --expiration-time flags) 147 """) 148 149 # Can only specify one of --autoscaling-policy or --disable-autoscaling 150 autoscaling_group = parser.add_mutually_exclusive_group() 151 flags.AddAutoscalingPolicyResourceArgForCluster( 152 autoscaling_group, api_version='v1') 153 autoscaling_group.add_argument( 154 '--disable-autoscaling', 155 action='store_true', 156 help="""\ 157 Disable autoscaling, if it is enabled. This is an alias for passing the 158 empty string to --autoscaling-policy'. 159 """) 160 161 def Run(self, args): 162 dataproc = dp.Dataproc(self.ReleaseTrack()) 163 164 cluster_ref = args.CONCEPTS.cluster.Parse() 165 166 cluster_config = dataproc.messages.ClusterConfig() 167 changed_fields = [] 168 169 has_changes = False 170 171 if args.num_workers is not None: 172 worker_config = dataproc.messages.InstanceGroupConfig( 173 numInstances=args.num_workers) 174 cluster_config.workerConfig = worker_config 175 changed_fields.append('config.worker_config.num_instances') 176 has_changes = True 177 178 num_secondary_workers = _FirstNonNone(args.num_preemptible_workers, 179 args.num_secondary_workers) 180 if num_secondary_workers is not None: 181 worker_config = dataproc.messages.InstanceGroupConfig( 182 numInstances=num_secondary_workers) 183 cluster_config.secondaryWorkerConfig = worker_config 184 changed_fields.append( 185 'config.secondary_worker_config.num_instances') 186 has_changes = True 187 188 if args.autoscaling_policy: 189 cluster_config.autoscalingConfig = dataproc.messages.AutoscalingConfig( 190 policyUri=args.CONCEPTS.autoscaling_policy.Parse().RelativeName()) 191 changed_fields.append('config.autoscaling_config.policy_uri') 192 has_changes = True 193 elif args.autoscaling_policy == '' or args.disable_autoscaling: # pylint: disable=g-explicit-bool-comparison 194 # Disabling autoscaling. Don't need to explicitly set 195 # cluster_config.autoscaling_config to None. 196 changed_fields.append('config.autoscaling_config.policy_uri') 197 has_changes = True 198 199 lifecycle_config = dataproc.messages.LifecycleConfig() 200 changed_config = False 201 if args.max_age is not None: 202 lifecycle_config.autoDeleteTtl = six.text_type(args.max_age) + 's' 203 changed_fields.append('config.lifecycle_config.auto_delete_ttl') 204 changed_config = True 205 if args.expiration_time is not None: 206 lifecycle_config.autoDeleteTime = times.FormatDateTime( 207 args.expiration_time) 208 changed_fields.append('config.lifecycle_config.auto_delete_time') 209 changed_config = True 210 if args.max_idle is not None: 211 lifecycle_config.idleDeleteTtl = six.text_type(args.max_idle) + 's' 212 changed_fields.append('config.lifecycle_config.idle_delete_ttl') 213 changed_config = True 214 if args.no_max_age: 215 lifecycle_config.autoDeleteTtl = None 216 changed_fields.append('config.lifecycle_config.auto_delete_ttl') 217 changed_config = True 218 if args.no_max_idle: 219 lifecycle_config.idleDeleteTtl = None 220 changed_fields.append('config.lifecycle_config.idle_delete_ttl') 221 changed_config = True 222 if changed_config: 223 cluster_config.lifecycleConfig = lifecycle_config 224 has_changes = True 225 226 # Put in a thunk so we only make this call if needed 227 def _GetCurrentLabels(): 228 # We need to fetch cluster first so we know what the labels look like. The 229 # labels_util will fill out the proto for us with all the updates and 230 # removals, but first we need to provide the current state of the labels 231 get_cluster_request = ( 232 dataproc.messages.DataprocProjectsRegionsClustersGetRequest( 233 projectId=cluster_ref.projectId, 234 region=cluster_ref.region, 235 clusterName=cluster_ref.clusterName)) 236 current_cluster = dataproc.client.projects_regions_clusters.Get( 237 get_cluster_request) 238 return current_cluster.labels 239 labels_update = labels_util.ProcessUpdateArgsLazy( 240 args, dataproc.messages.Cluster.LabelsValue, 241 orig_labels_thunk=_GetCurrentLabels) 242 if labels_update.needs_update: 243 has_changes = True 244 changed_fields.append('labels') 245 labels = labels_update.GetOrNone() 246 247 if not has_changes: 248 raise exceptions.ArgumentError( 249 'Must specify at least one cluster parameter to update.') 250 251 cluster = dataproc.messages.Cluster( 252 config=cluster_config, 253 clusterName=cluster_ref.clusterName, 254 labels=labels, 255 projectId=cluster_ref.projectId) 256 257 request = dataproc.messages.DataprocProjectsRegionsClustersPatchRequest( 258 clusterName=cluster_ref.clusterName, 259 region=cluster_ref.region, 260 projectId=cluster_ref.projectId, 261 cluster=cluster, 262 updateMask=','.join(changed_fields), 263 requestId=util.GetUniqueId()) 264 265 if args.graceful_decommission_timeout is not None: 266 request.gracefulDecommissionTimeout = ( 267 six.text_type(args.graceful_decommission_timeout) + 's') 268 269 operation = dataproc.client.projects_regions_clusters.Patch(request) 270 271 if args.async_: 272 log.status.write( 273 'Updating [{0}] with operation [{1}].'.format( 274 cluster_ref, operation.name)) 275 return 276 277 util.WaitForOperation( 278 dataproc, 279 operation, 280 message='Waiting for cluster update operation', 281 timeout_s=args.timeout) 282 283 request = dataproc.messages.DataprocProjectsRegionsClustersGetRequest( 284 projectId=cluster_ref.projectId, 285 region=cluster_ref.region, 286 clusterName=cluster_ref.clusterName) 287 cluster = dataproc.client.projects_regions_clusters.Get(request) 288 log.UpdatedResource(cluster_ref) 289 return cluster 290 291 292def _FirstNonNone(first, second): 293 return first if first is not None else second 294