1# -*- coding: utf-8 -*- #
2# Copyright 2015 Google LLC. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#    http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""Update cluster command."""
17
18from __future__ import absolute_import
19from __future__ import division
20from __future__ import unicode_literals
21
22from googlecloudsdk.api_lib.dataproc import dataproc as dp
23from googlecloudsdk.api_lib.dataproc import exceptions
24from googlecloudsdk.api_lib.dataproc import util
25from googlecloudsdk.calliope import actions
26from googlecloudsdk.calliope import arg_parsers
27from googlecloudsdk.calliope import base
28from googlecloudsdk.command_lib.dataproc import flags
29from googlecloudsdk.command_lib.util.args import labels_util
30from googlecloudsdk.core import log
31from googlecloudsdk.core.util import times
32import six
33
34
35class Update(base.UpdateCommand):
36  """Update labels and/or the number of worker nodes in a cluster.
37
38  Update the number of worker nodes and/or the labels in a cluster.
39
40  ## EXAMPLES
41
42  To resize a cluster, run:
43
44    $ {command} my_cluster --region=us-central1 --num-workers=5
45
46  To change the number preemptible workers in a cluster, run:
47
48    $ {command} my_cluster --region=us-central1 --num-preemptible-workers=5
49
50  To add the label 'customer=acme' to a cluster, run:
51
52    $ {command} my_cluster --region=us-central1 --update-labels=customer=acme
53
54  To update the label 'customer=ackme' to 'customer=acme', run:
55
56    $ {command} my_cluster --region=us-central1 --update-labels=customer=acme
57
58  To remove the label whose key is 'customer', run:
59
60    $ {command} my_cluster --region=us-central1 --remove-labels=customer
61
62  """
63
64  @classmethod
65  def Args(cls, parser):
66    dataproc = dp.Dataproc(cls.ReleaseTrack())
67    base.ASYNC_FLAG.AddToParser(parser)
68    # Allow the user to specify new labels as well as update/remove existing
69    labels_util.AddUpdateLabelsFlags(parser)
70    # Graceful decomissioning timeouts can be up to 24 hours + add 1 hour for
71    # deleting VMs, etc.
72    flags.AddTimeoutFlag(parser, default='25h')
73    flags.AddClusterResourceArg(parser, 'update', dataproc.api_version)
74    parser.add_argument(
75        '--num-workers',
76        type=int,
77        help='The new number of worker nodes in the cluster.')
78    num_secondary_workers = parser.add_argument_group(mutex=True)
79    num_secondary_workers.add_argument(
80        '--num-preemptible-workers',
81        action=actions.DeprecationAction(
82            '--num-preemptible-workers',
83            warn=('The `--num-preemptible-workers` flag is deprecated. '
84                  'Use the `--num-secondary-workers` flag instead.')),
85        type=int,
86        hidden=True,
87        help='The new number of preemptible worker nodes in the cluster.')
88    num_secondary_workers.add_argument(
89        '--num-secondary-workers',
90        type=int,
91        help='The new number of secondary worker nodes in the cluster.')
92
93    parser.add_argument(
94        '--graceful-decommission-timeout',
95        type=arg_parsers.Duration(lower_bound='0s', upper_bound='1d'),
96        help="""
97              The graceful decommission timeout for decommissioning Node Managers
98              in the cluster, used when removing nodes. Graceful decommissioning
99              allows removing nodes from the cluster without interrupting jobs in
100              progress. Timeout specifies how long to wait for jobs in progress to
101              finish before forcefully removing nodes (and potentially
102              interrupting jobs). Timeout defaults to 0 if not set (for forceful
103              decommission), and the maximum allowed timeout is 1 day.
104              See $ gcloud topic datetimes for information on duration formats.
105              """)
106
107    idle_delete_group = parser.add_mutually_exclusive_group()
108    idle_delete_group.add_argument(
109        '--max-idle',
110        type=arg_parsers.Duration(),
111        help="""\
112        The duration before cluster is auto-deleted after last job finished,
113        such as "2h" or "1d".
114        See $ gcloud topic datetimes for information on duration formats.
115        """)
116    idle_delete_group.add_argument(
117        '--no-max-idle',
118        action='store_true',
119        help="""\
120        Cancels the cluster auto-deletion by cluster idle duration (configured
121         by --max-idle flag)
122        """)
123
124    auto_delete_group = parser.add_mutually_exclusive_group()
125    auto_delete_group.add_argument(
126        '--max-age',
127        type=arg_parsers.Duration(),
128        help="""\
129        The lifespan of the cluster before it is auto-deleted, such as
130        "2h" or "1d".
131        See $ gcloud topic datetimes for information on duration formats.
132        """)
133    auto_delete_group.add_argument(
134        '--expiration-time',
135        type=arg_parsers.Datetime.Parse,
136        help="""\
137        The time when cluster will be auto-deleted, such as
138        "2017-08-29T18:52:51.142Z". See $ gcloud topic datetimes for
139        information on time formats.
140        """)
141    auto_delete_group.add_argument(
142        '--no-max-age',
143        action='store_true',
144        help="""\
145        Cancels the cluster auto-deletion by maximum cluster age (configured by
146         --max-age or --expiration-time flags)
147        """)
148
149    # Can only specify one of --autoscaling-policy or --disable-autoscaling
150    autoscaling_group = parser.add_mutually_exclusive_group()
151    flags.AddAutoscalingPolicyResourceArgForCluster(
152        autoscaling_group, api_version='v1')
153    autoscaling_group.add_argument(
154        '--disable-autoscaling',
155        action='store_true',
156        help="""\
157        Disable autoscaling, if it is enabled. This is an alias for passing the
158        empty string to --autoscaling-policy'.
159        """)
160
161  def Run(self, args):
162    dataproc = dp.Dataproc(self.ReleaseTrack())
163
164    cluster_ref = args.CONCEPTS.cluster.Parse()
165
166    cluster_config = dataproc.messages.ClusterConfig()
167    changed_fields = []
168
169    has_changes = False
170
171    if args.num_workers is not None:
172      worker_config = dataproc.messages.InstanceGroupConfig(
173          numInstances=args.num_workers)
174      cluster_config.workerConfig = worker_config
175      changed_fields.append('config.worker_config.num_instances')
176      has_changes = True
177
178    num_secondary_workers = _FirstNonNone(args.num_preemptible_workers,
179                                          args.num_secondary_workers)
180    if num_secondary_workers is not None:
181      worker_config = dataproc.messages.InstanceGroupConfig(
182          numInstances=num_secondary_workers)
183      cluster_config.secondaryWorkerConfig = worker_config
184      changed_fields.append(
185          'config.secondary_worker_config.num_instances')
186      has_changes = True
187
188    if args.autoscaling_policy:
189      cluster_config.autoscalingConfig = dataproc.messages.AutoscalingConfig(
190          policyUri=args.CONCEPTS.autoscaling_policy.Parse().RelativeName())
191      changed_fields.append('config.autoscaling_config.policy_uri')
192      has_changes = True
193    elif args.autoscaling_policy == '' or args.disable_autoscaling:  # pylint: disable=g-explicit-bool-comparison
194      # Disabling autoscaling. Don't need to explicitly set
195      # cluster_config.autoscaling_config to None.
196      changed_fields.append('config.autoscaling_config.policy_uri')
197      has_changes = True
198
199    lifecycle_config = dataproc.messages.LifecycleConfig()
200    changed_config = False
201    if args.max_age is not None:
202      lifecycle_config.autoDeleteTtl = six.text_type(args.max_age) + 's'
203      changed_fields.append('config.lifecycle_config.auto_delete_ttl')
204      changed_config = True
205    if args.expiration_time is not None:
206      lifecycle_config.autoDeleteTime = times.FormatDateTime(
207          args.expiration_time)
208      changed_fields.append('config.lifecycle_config.auto_delete_time')
209      changed_config = True
210    if args.max_idle is not None:
211      lifecycle_config.idleDeleteTtl = six.text_type(args.max_idle) + 's'
212      changed_fields.append('config.lifecycle_config.idle_delete_ttl')
213      changed_config = True
214    if args.no_max_age:
215      lifecycle_config.autoDeleteTtl = None
216      changed_fields.append('config.lifecycle_config.auto_delete_ttl')
217      changed_config = True
218    if args.no_max_idle:
219      lifecycle_config.idleDeleteTtl = None
220      changed_fields.append('config.lifecycle_config.idle_delete_ttl')
221      changed_config = True
222    if changed_config:
223      cluster_config.lifecycleConfig = lifecycle_config
224      has_changes = True
225
226    # Put in a thunk so we only make this call if needed
227    def _GetCurrentLabels():
228      # We need to fetch cluster first so we know what the labels look like. The
229      # labels_util will fill out the proto for us with all the updates and
230      # removals, but first we need to provide the current state of the labels
231      get_cluster_request = (
232          dataproc.messages.DataprocProjectsRegionsClustersGetRequest(
233              projectId=cluster_ref.projectId,
234              region=cluster_ref.region,
235              clusterName=cluster_ref.clusterName))
236      current_cluster = dataproc.client.projects_regions_clusters.Get(
237          get_cluster_request)
238      return current_cluster.labels
239    labels_update = labels_util.ProcessUpdateArgsLazy(
240        args, dataproc.messages.Cluster.LabelsValue,
241        orig_labels_thunk=_GetCurrentLabels)
242    if labels_update.needs_update:
243      has_changes = True
244      changed_fields.append('labels')
245    labels = labels_update.GetOrNone()
246
247    if not has_changes:
248      raise exceptions.ArgumentError(
249          'Must specify at least one cluster parameter to update.')
250
251    cluster = dataproc.messages.Cluster(
252        config=cluster_config,
253        clusterName=cluster_ref.clusterName,
254        labels=labels,
255        projectId=cluster_ref.projectId)
256
257    request = dataproc.messages.DataprocProjectsRegionsClustersPatchRequest(
258        clusterName=cluster_ref.clusterName,
259        region=cluster_ref.region,
260        projectId=cluster_ref.projectId,
261        cluster=cluster,
262        updateMask=','.join(changed_fields),
263        requestId=util.GetUniqueId())
264
265    if args.graceful_decommission_timeout is not None:
266      request.gracefulDecommissionTimeout = (
267          six.text_type(args.graceful_decommission_timeout) + 's')
268
269    operation = dataproc.client.projects_regions_clusters.Patch(request)
270
271    if args.async_:
272      log.status.write(
273          'Updating [{0}] with operation [{1}].'.format(
274              cluster_ref, operation.name))
275      return
276
277    util.WaitForOperation(
278        dataproc,
279        operation,
280        message='Waiting for cluster update operation',
281        timeout_s=args.timeout)
282
283    request = dataproc.messages.DataprocProjectsRegionsClustersGetRequest(
284        projectId=cluster_ref.projectId,
285        region=cluster_ref.region,
286        clusterName=cluster_ref.clusterName)
287    cluster = dataproc.client.projects_regions_clusters.Get(request)
288    log.UpdatedResource(cluster_ref)
289    return cluster
290
291
292def _FirstNonNone(first, second):
293  return first if first is not None else second
294