1# --------------------------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for license information.
4# --------------------------------------------------------------------------------------------
5
6import re
7import sys
8import time
9from distutils.version import StrictVersion
10from typing import Any, Dict, List, Tuple, TypeVar, Union
11
12from azure.cli.command_modules.acs._consts import (
13    ADDONS,
14    CONST_ACC_SGX_QUOTE_HELPER_ENABLED,
15    CONST_AZURE_POLICY_ADDON_NAME,
16    CONST_CONFCOM_ADDON_NAME,
17    CONST_HTTP_APPLICATION_ROUTING_ADDON_NAME,
18    CONST_INGRESS_APPGW_ADDON_NAME,
19    CONST_INGRESS_APPGW_APPLICATION_GATEWAY_ID,
20    CONST_INGRESS_APPGW_APPLICATION_GATEWAY_NAME,
21    CONST_INGRESS_APPGW_SUBNET_CIDR,
22    CONST_INGRESS_APPGW_SUBNET_ID,
23    CONST_INGRESS_APPGW_WATCH_NAMESPACE,
24    CONST_KUBE_DASHBOARD_ADDON_NAME,
25    CONST_MONITORING_ADDON_NAME,
26    CONST_MONITORING_LOG_ANALYTICS_WORKSPACE_RESOURCE_ID,
27    CONST_OUTBOUND_TYPE_LOAD_BALANCER,
28    CONST_OUTBOUND_TYPE_USER_DEFINED_ROUTING,
29    CONST_PRIVATE_DNS_ZONE_SYSTEM,
30    CONST_VIRTUAL_NODE_ADDON_NAME,
31    CONST_VIRTUAL_NODE_SUBNET_NAME,
32    CONST_OPEN_SERVICE_MESH_ADDON_NAME,
33    DecoratorMode,
34)
35from azure.cli.command_modules.acs.custom import (
36    _add_role_assignment,
37    _ensure_aks_acr,
38    _ensure_aks_service_principal,
39    _ensure_cluster_identity_permission_on_kubelet_identity,
40    _ensure_container_insights_for_monitoring,
41    _ensure_default_log_analytics_workspace_for_monitoring,
42    _get_rg_location,
43    _get_user_assigned_identity,
44    _put_managed_cluster_ensuring_permission,
45    create_load_balancer_profile,
46    set_load_balancer_sku,
47    subnet_role_assignment_exists,
48)
49from azure.cli.core import AzCommandsLoader
50from azure.cli.core._profile import Profile
51from azure.cli.core.azclierror import (
52    ArgumentUsageError,
53    CLIInternalError,
54    InvalidArgumentValueError,
55    MutuallyExclusiveArgumentError,
56    NoTTYError,
57    RequiredArgumentMissingError,
58    UnknownError,
59)
60from azure.cli.core.commands import AzCliCommand
61from azure.cli.core.keys import is_valid_ssh_rsa_public_key
62from azure.cli.core.profiles import ResourceType
63from azure.cli.core.util import truncate_text
64from knack.log import get_logger
65from knack.prompting import NoTTYException, prompt, prompt_pass, prompt_y_n
66from msrestazure.azure_exceptions import CloudError
67from msrestazure.tools import is_valid_resource_id
68
69logger = get_logger(__name__)
70
71# type variables
72ContainerServiceClient = TypeVar("ContainerServiceClient")
73Identity = TypeVar("Identity")
74ManagedCluster = TypeVar("ManagedCluster")
75ManagedClusterLoadBalancerProfile = TypeVar("ManagedClusterLoadBalancerProfile")
76ResourceReference = TypeVar("ResourceReference")
77
78# TODO
79# remove model loading for cluster_autoscaler_profile in _validators
80# add validation for all/some of the parameters involved in the getter of outbound_type/enable_addons
81
82
83def format_parameter_name_to_option_name(parameter_name: str) -> str:
84    """Convert a name in parameter format to option format.
85
86    Underscores ("_") are used to connect the various parts of a parameter name, while hyphens ("-") are used to connect
87    each part of an option name. Besides, the option name starts with double hyphens ("--").
88
89    :return: str
90    """
91    option_name = "--" + parameter_name.replace("_", "-")
92    return option_name
93
94
95def safe_list_get(li: List, idx: int, default: Any = None) -> Any:
96    """Get an element from a list without raising IndexError.
97
98    Attempt to get the element with index idx from a list-like object li, and if the index is invalid (such as out of
99    range), return default (whose default value is None).
100
101    :return: an element of any type
102    """
103    if isinstance(li, list):
104        try:
105            return li[idx]
106        except IndexError:
107            return default
108    return None
109
110
111def safe_lower(obj: Any) -> Any:
112    """Return lowercase string if the provided obj is a string, otherwise return the object itself.
113
114    :return: Any
115    """
116    if isinstance(obj, str):
117        return obj.lower()
118    return obj
119
120
121def validate_decorator_mode(decorator_mode) -> bool:
122    """Check if decorator_mode is a value of enum type DecoratorMode.
123
124    :return: bool
125    """
126    is_valid_decorator_mode = False
127    try:
128        is_valid_decorator_mode = decorator_mode in DecoratorMode
129    # will raise TypeError in Python >= 3.8
130    except TypeError:
131        pass
132
133    return is_valid_decorator_mode
134
135
136def check_is_msi_cluster(mc: ManagedCluster) -> bool:
137    """Check `mc` object to determine whether managed identity is enabled.
138
139    :return: bool
140    """
141    if mc and mc.identity and mc.identity.type is not None:
142        identity_type = mc.identity.type.casefold()
143        if identity_type in ("systemassigned", "userassigned"):
144            return True
145    return False
146
147
148def validate_counts_in_autoscaler(
149    node_count,
150    enable_cluster_autoscaler,
151    min_count,
152    max_count,
153    decorator_mode,
154) -> None:
155    """Check the validity of serveral count-related parameters in autoscaler.
156
157    On the premise that enable_cluster_autoscaler (in update mode, this could be update_cluster_autoscaler) is enabled,
158    it will check whether both min_count and max_count are  assigned, if not, raise the RequiredArgumentMissingError. If
159    min_count is less than max_count, raise the InvalidArgumentValueError. Only in create mode it will check whether the
160    value of node_count is between min_count and max_count, if not, raise the InvalidArgumentValueError. If
161    enable_cluster_autoscaler (in update mode, this could be update_cluster_autoscaler) is not enabled, it will check
162    whether any of min_count or max_count is assigned, if so, raise the RequiredArgumentMissingError.
163
164    :return: None
165    """
166    # validation
167    if enable_cluster_autoscaler:
168        if min_count is None or max_count is None:
169            raise RequiredArgumentMissingError(
170                "Please specify both min-count and max-count when --enable-cluster-autoscaler enabled"
171            )
172        if min_count > max_count:
173            raise InvalidArgumentValueError(
174                "Value of min-count should be less than or equal to value of max-count"
175            )
176        if decorator_mode == DecoratorMode.CREATE:
177            if node_count < min_count or node_count > max_count:
178                raise InvalidArgumentValueError(
179                    "node-count is not in the range of min-count and max-count"
180                )
181    else:
182        if min_count is not None or max_count is not None:
183            option_name = "--enable-cluster-autoscaler"
184            if decorator_mode == DecoratorMode.UPDATE:
185                option_name += " or --update-cluster-autoscaler"
186            raise RequiredArgumentMissingError(
187                "min-count and max-count are required for {}, please use the flag".format(
188                    option_name
189                )
190            )
191
192
193# pylint: disable=too-many-instance-attributes,too-few-public-methods
194class AKSModels:
195    """Store the models used in aks_create.
196
197    The api version of the class corresponding to a model is determined by resource_type.
198    """
199    def __init__(
200        self,
201        cmd: AzCommandsLoader,
202        resource_type: ResourceType = ResourceType.MGMT_CONTAINERSERVICE,
203    ):
204        self.__cmd = cmd
205        self.resource_type = resource_type
206        self.ManagedCluster = self.__cmd.get_models(
207            "ManagedCluster",
208            resource_type=self.resource_type,
209            operation_group="managed_clusters",
210        )
211        self.ManagedClusterWindowsProfile = self.__cmd.get_models(
212            "ManagedClusterWindowsProfile",
213            resource_type=self.resource_type,
214            operation_group="managed_clusters",
215        )
216        self.ManagedClusterSKU = self.__cmd.get_models(
217            "ManagedClusterSKU",
218            resource_type=self.resource_type,
219            operation_group="managed_clusters",
220        )
221        self.ContainerServiceNetworkProfile = self.__cmd.get_models(
222            "ContainerServiceNetworkProfile",
223            resource_type=self.resource_type,
224            operation_group="managed_clusters",
225        )
226        self.ContainerServiceLinuxProfile = self.__cmd.get_models(
227            "ContainerServiceLinuxProfile",
228            resource_type=self.resource_type,
229            operation_group="managed_clusters",
230        )
231        self.ManagedClusterServicePrincipalProfile = self.__cmd.get_models(
232            "ManagedClusterServicePrincipalProfile",
233            resource_type=self.resource_type,
234            operation_group="managed_clusters",
235        )
236        self.ContainerServiceSshConfiguration = self.__cmd.get_models(
237            "ContainerServiceSshConfiguration",
238            resource_type=self.resource_type,
239            operation_group="managed_clusters",
240        )
241        self.ContainerServiceSshPublicKey = self.__cmd.get_models(
242            "ContainerServiceSshPublicKey",
243            resource_type=self.resource_type,
244            operation_group="managed_clusters",
245        )
246        self.ManagedClusterAADProfile = self.__cmd.get_models(
247            "ManagedClusterAADProfile",
248            resource_type=self.resource_type,
249            operation_group="managed_clusters",
250        )
251        self.ManagedClusterAutoUpgradeProfile = self.__cmd.get_models(
252            "ManagedClusterAutoUpgradeProfile",
253            resource_type=self.resource_type,
254            operation_group="managed_clusters",
255        )
256        self.ManagedClusterAgentPoolProfile = self.__cmd.get_models(
257            "ManagedClusterAgentPoolProfile",
258            resource_type=self.resource_type,
259            operation_group="managed_clusters",
260        )
261        self.ManagedClusterIdentity = self.__cmd.get_models(
262            "ManagedClusterIdentity",
263            resource_type=self.resource_type,
264            operation_group="managed_clusters",
265        )
266        self.UserAssignedIdentity = self.__cmd.get_models(
267            "UserAssignedIdentity",
268            resource_type=self.resource_type,
269            operation_group="managed_clusters",
270        )
271        self.ManagedServiceIdentityUserAssignedIdentitiesValue = (
272            self.__cmd.get_models(
273                "ManagedServiceIdentityUserAssignedIdentitiesValue",
274                resource_type=self.resource_type,
275                operation_group="managed_clusters",
276            )
277        )
278        self.ManagedClusterAddonProfile = self.__cmd.get_models(
279            "ManagedClusterAddonProfile",
280            resource_type=self.resource_type,
281            operation_group="managed_clusters",
282        )
283        self.ManagedClusterAPIServerAccessProfile = self.__cmd.get_models(
284            "ManagedClusterAPIServerAccessProfile",
285            resource_type=self.resource_type,
286            operation_group="managed_clusters",
287        )
288        self.ExtendedLocation = self.__cmd.get_models(
289            "ExtendedLocation",
290            resource_type=self.resource_type,
291            operation_group="managed_clusters",
292        )
293        self.ExtendedLocationTypes = self.__cmd.get_models(
294            "ExtendedLocationTypes",
295            resource_type=self.resource_type,
296            operation_group="managed_clusters",
297        )
298        # not directly used
299        self.ManagedClusterPropertiesAutoScalerProfile = self.__cmd.get_models(
300            "ManagedClusterPropertiesAutoScalerProfile",
301            resource_type=self.resource_type,
302            operation_group="managed_clusters",
303        )
304        # init load balancer models
305        self.init_lb_models()
306
307    def init_lb_models(self) -> None:
308        """Initialize models used by load balancer.
309
310        The models are stored in a dictionary, the key is the model name and the value is the model type.
311
312        :return: None
313        """
314        lb_models = {}
315        lb_models["ManagedClusterLoadBalancerProfile"] = self.__cmd.get_models(
316            "ManagedClusterLoadBalancerProfile",
317            resource_type=self.resource_type,
318            operation_group="managed_clusters",
319        )
320        lb_models[
321            "ManagedClusterLoadBalancerProfileManagedOutboundIPs"
322        ] = self.__cmd.get_models(
323            "ManagedClusterLoadBalancerProfileManagedOutboundIPs",
324            resource_type=self.resource_type,
325            operation_group="managed_clusters",
326        )
327        lb_models[
328            "ManagedClusterLoadBalancerProfileOutboundIPs"
329        ] = self.__cmd.get_models(
330            "ManagedClusterLoadBalancerProfileOutboundIPs",
331            resource_type=self.resource_type,
332            operation_group="managed_clusters",
333        )
334        lb_models[
335            "ManagedClusterLoadBalancerProfileOutboundIPPrefixes"
336        ] = self.__cmd.get_models(
337            "ManagedClusterLoadBalancerProfileOutboundIPPrefixes",
338            resource_type=self.resource_type,
339            operation_group="managed_clusters",
340        )
341        lb_models["ResourceReference"] = self.__cmd.get_models(
342            "ResourceReference",
343            resource_type=self.resource_type,
344            operation_group="managed_clusters",
345        )
346        self.lb_models = lb_models
347        # Note: Uncomment the followings to add these models as class attributes.
348        # for model_name, model_type in lb_models.items():
349        #     setattr(self, model_name, model_type)
350
351
352# pylint: disable=too-many-public-methods
353class AKSContext:
354    """Implement getter functions for all parameters in aks_create.
355
356    Note: One of the most basic principles is that when parameters are put into a certain profile (and further
357    decorated into the ManagedCluster object by AKSCreateDecorator), it shouldn't be modified any more, only
358    read-only operations (e.g. validation) can be performed.
359
360    This class also stores a copy of the original function parameters, some intermediate variables (such as the
361    subscription ID) and a reference of the ManagedCluster object.
362
363    When adding a new parameter for aks_create, please also provide a "getter" function named `get_xxx`, where `xxx` is
364    the parameter name. In this function, the process of obtaining parameter values, dynamic completion (optional),
365    and validation (optional) should be followed. The obtaining of parameter values should further follow the order
366    of obtaining from the ManagedCluster object or from the original value.
367
368    Attention: In case of checking the validity of parameters, make sure enable_validation is never set to True and
369    read_only is set to True when necessary to avoid loop calls, when using the getter function to obtain the value of
370    other parameters.
371    """
372    def __init__(self, cmd: AzCliCommand, raw_parameters: Dict, decorator_mode):
373        if not isinstance(raw_parameters, dict):
374            raise CLIInternalError(
375                "Unexpected raw_parameters object with type '{}'.".format(
376                    type(raw_parameters)
377                )
378            )
379        if not validate_decorator_mode(decorator_mode):
380            raise CLIInternalError(
381                "Unexpected decorator_mode '{}' with type '{}'.".format(
382                    decorator_mode, type(decorator_mode)
383                )
384            )
385        self.cmd = cmd
386        self.raw_param = raw_parameters
387        self.decorator_mode = decorator_mode
388        self.intermediates = dict()
389        self.mc = None
390
391    def attach_mc(self, mc: ManagedCluster) -> None:
392        """Attach the ManagedCluster object to the context.
393
394        The `mc` object is only allowed to be attached once, and attaching again will raise a CLIInternalError.
395
396        :return: None
397        """
398        if self.mc is None:
399            self.mc = mc
400        else:
401            msg = "the same" if self.mc == mc else "different"
402            raise CLIInternalError(
403                "Attempting to attach the `mc` object again, the two objects are {}.".format(
404                    msg
405                )
406            )
407
408    def get_intermediate(self, variable_name: str, default_value: Any = None) -> Any:
409        """Get the value of an intermediate by its name.
410
411        Get the value from the intermediates dictionary with variable_name as the key. If variable_name does not exist,
412        default_value will be returned.
413
414        :return: Any
415        """
416        if variable_name not in self.intermediates:
417            msg = "The intermediate '{}' does not exist, return default value '{}'.".format(
418                variable_name, default_value
419            )
420            logger.debug(msg)
421        return self.intermediates.get(variable_name, default_value)
422
423    def set_intermediate(
424        self, variable_name: str, value: Any, overwrite_exists: bool = False
425    ) -> None:
426        """Set the value of an intermediate by its name.
427
428        In the case that the intermediate value already exists, if overwrite_exists is enabled, the value will be
429        overwritten and the log will be output at the debug level, otherwise the value will not be overwritten and
430        the log will be output at the warning level, which by default will be output to stderr and seen by user.
431
432        :return: None
433        """
434        if variable_name in self.intermediates:
435            if overwrite_exists:
436                msg = "The intermediate '{}' is overwritten. Original value: '{}', new value: '{}'.".format(
437                    variable_name, self.intermediates.get(variable_name), value
438                )
439                logger.debug(msg)
440                self.intermediates[variable_name] = value
441            elif self.intermediates.get(variable_name) != value:
442                msg = "The intermediate '{}' already exists, but overwrite is not enabled. " \
443                    "Original value: '{}', candidate value: '{}'.".format(
444                        variable_name,
445                        self.intermediates.get(variable_name),
446                        value,
447                    )
448                # warning level log will be output to the console, which may cause confusion to users
449                logger.warning(msg)
450        else:
451            self.intermediates[variable_name] = value
452
453    def remove_intermediate(self, variable_name: str) -> None:
454        """Remove the value of an intermediate by its name.
455
456        No exception will be raised if the intermediate does not exist.
457
458        :return: None
459        """
460        self.intermediates.pop(variable_name, None)
461
462    def get_subscription_id(self):
463        """Helper function to obtain the value of subscription_id.
464
465        Note: This is not a parameter of aks_create, and it will not be decorated into the `mc` object.
466
467        If no corresponding intermediate exists, method "get_subscription_id" of class "Profile" will be called, which
468        depends on "az login" in advance, the returned subscription_id will be stored as an intermediate.
469
470        :return: string
471        """
472        subscription_id = self.get_intermediate("subscription_id", None)
473        if not subscription_id:
474            subscription_id = self.cmd.cli_ctx.data.get('subscription_id')
475            if not subscription_id:
476                subscription_id = Profile(cli_ctx=self.cmd.cli_ctx).get_subscription_id()
477                self.cmd.cli_ctx.data['subscription_id'] = subscription_id
478            self.set_intermediate("subscription_id", subscription_id, overwrite_exists=True)
479        return subscription_id
480
481    def get_resource_group_name(self) -> str:
482        """Obtain the value of resource_group_name.
483
484        Note: resource_group_name will not be decorated into the `mc` object.
485
486        The value of this parameter should be provided by user explicitly.
487
488        :return: string
489        """
490        # read the original value passed by the command
491        resource_group_name = self.raw_param.get("resource_group_name")
492
493        # this parameter does not need dynamic completion
494        # this parameter does not need validation
495        return resource_group_name
496
497    def get_name(self) -> str:
498        """Obtain the value of name.
499
500        Note: name will not be decorated into the `mc` object.
501
502        The value of this parameter should be provided by user explicitly.
503
504        :return: string
505        """
506        # read the original value passed by the command
507        name = self.raw_param.get("name")
508
509        # this parameter does not need dynamic completion
510        # this parameter does not need validation
511        return name
512
513    # pylint: disable=unused-argument
514    def _get_location(self, read_only: bool = False, **kwargs) -> Union[str, None]:
515        """Internal function to dynamically obtain the value of location according to the context.
516
517        When location is not assigned, dynamic completion will be triggerd. Function "_get_rg_location" will be called
518        to get the location of the provided resource group, which internally used ResourceManagementClient to send
519        the request.
520
521        This function supports the option of read_only. When enabled, it will skip dynamic completion and validation.
522
523        :return: string or None
524        """
525        # read the original value passed by the command
526        location = self.raw_param.get("location")
527        # try to read the property value corresponding to the parameter from the `mc` object
528        read_from_mc = False
529        if self.mc and self.mc.location is not None:
530            location = self.mc.location
531            read_from_mc = True
532
533        # skip dynamic completion & validation if option read_only is specified
534        if read_only:
535            return location
536
537        # dynamic completion
538        if not read_from_mc and location is None:
539            location = _get_rg_location(
540                self.cmd.cli_ctx, self.get_resource_group_name()
541            )
542
543        # this parameter does not need validation
544        return location
545
546    def get_location(self) -> Union[str, None]:
547        """Dynamically obtain the value of location according to the context.
548
549        When location is not assigned, dynamic completion will be triggerd. Function "_get_rg_location" will be called
550        to get the location of the provided resource group, which internally used ResourceManagementClient to send
551        the request.
552
553        :return: string or None
554        """
555
556        return self._get_location()
557
558    def get_ssh_key_value_and_no_ssh_key(self) -> Tuple[str, bool]:
559        """Obtain the value of ssh_key_value and no_ssh_key.
560
561        Note: no_ssh_key will not be decorated into the `mc` object.
562
563        If the user does not explicitly specify --ssh-key-value, the validator function "validate_ssh_key" will check
564        the default file location "~/.ssh/id_rsa.pub", if the file exists, read its content and return. Otherise,
565        create a key pair at "~/.ssh/id_rsa.pub" and return the public key.
566        If the user provides a string-like input for --ssh-key-value, the validator function "validate_ssh_key" will
567        check whether it is a file path, if so, read its content and return; if it is a valid public key, return it.
568        Otherwise, create a key pair there and return the public key.
569
570        This function will verify the parameters by default. It will verify the validity of ssh_key_value. If parameter
571        no_ssh_key is set to True, verification will be skipped. Otherwise, an InvalidArgumentValueError will be raised
572        when the value of ssh_key_value is invalid.
573
574        :return: a tuple containing two elements: ssh_key_value of string type and no_ssh_key of bool type
575        """
576        # ssh_key_value
577        # read the original value passed by the command
578        raw_value = self.raw_param.get("ssh_key_value")
579        # try to read the property value corresponding to the parameter from the `mc` object
580        value_obtained_from_mc = None
581        if (
582            self.mc and
583            self.mc.linux_profile and
584            self.mc.linux_profile.ssh and
585            self.mc.linux_profile.ssh.public_keys
586        ):
587            public_key_obj = safe_list_get(
588                self.mc.linux_profile.ssh.public_keys, 0, None
589            )
590            if public_key_obj:
591                value_obtained_from_mc = public_key_obj.key_data
592
593        # set default value
594        read_from_mc = False
595        if value_obtained_from_mc is not None:
596            ssh_key_value = value_obtained_from_mc
597            read_from_mc = True
598        else:
599            ssh_key_value = raw_value
600
601        # no_ssh_key
602        # read the original value passed by the command
603        no_ssh_key = self.raw_param.get("no_ssh_key")
604
605        # consistent check
606        if read_from_mc and no_ssh_key:
607            raise CLIInternalError(
608                "Inconsistent state detected, ssh_key_value is read from the `mc` object while no_ssh_key is enabled."
609            )
610
611        # these parameters do not need dynamic completion
612
613        # validation
614        if not no_ssh_key:
615            try:
616                if not ssh_key_value or not is_valid_ssh_rsa_public_key(
617                    ssh_key_value
618                ):
619                    raise ValueError()
620            except (TypeError, ValueError):
621                shortened_key = truncate_text(ssh_key_value)
622                raise InvalidArgumentValueError(
623                    "Provided ssh key ({}) is invalid or non-existent".format(
624                        shortened_key
625                    )
626                )
627        return ssh_key_value, no_ssh_key
628
629    # pylint: disable=unused-argument
630    def _get_dns_name_prefix(
631        self, enable_validation: bool = False, read_only: bool = False, **kwargs
632    ) -> Union[str, None]:
633        """Internal function to dynamically obtain the value of dns_name_prefix according to the context.
634
635        When both dns_name_prefix and fqdn_subdomain are not assigned, dynamic completion will be triggerd. A default
636        dns_name_prefix composed of name (cluster), resource_group_name, and subscription_id will be created.
637
638        This function supports the option of enable_validation. When enabled, it will check if both dns_name_prefix and
639        fqdn_subdomain are assigend, if so, raise the MutuallyExclusiveArgumentError.
640        This function supports the option of read_only. When enabled, it will skip dynamic completion and validation.
641
642        :return: string or None
643        """
644        # read the original value passed by the command
645        dns_name_prefix = self.raw_param.get("dns_name_prefix")
646        # try to read the property value corresponding to the parameter from the `mc` object
647        read_from_mc = False
648        if self.mc and self.mc.dns_prefix is not None:
649            dns_name_prefix = self.mc.dns_prefix
650            read_from_mc = True
651
652        # skip dynamic completion & validation if option read_only is specified
653        if read_only:
654            return dns_name_prefix
655
656        dynamic_completion = False
657        # check whether the parameter meet the conditions of dynamic completion
658        if not dns_name_prefix and not self._get_fqdn_subdomain(enable_validation=False):
659            dynamic_completion = True
660        # disable dynamic completion if the value is read from `mc`
661        dynamic_completion = dynamic_completion and not read_from_mc
662        # In case the user does not specify the parameter and it meets the conditions of automatic completion,
663        # necessary information is dynamically completed.
664        if dynamic_completion:
665            name = self.get_name()
666            resource_group_name = self.get_resource_group_name()
667            subscription_id = self.get_subscription_id()
668            # Use subscription id to provide uniqueness and prevent DNS name clashes
669            name_part = re.sub('[^A-Za-z0-9-]', '', name)[0:10]
670            if not name_part[0].isalpha():
671                name_part = (str('a') + name_part)[0:10]
672            resource_group_part = re.sub(
673                '[^A-Za-z0-9-]', '', resource_group_name)[0:16]
674            dns_name_prefix = '{}-{}-{}'.format(name_part, resource_group_part, subscription_id[0:6])
675
676        # validation
677        if enable_validation:
678            if dns_name_prefix and self._get_fqdn_subdomain(enable_validation=False):
679                raise MutuallyExclusiveArgumentError(
680                    "--dns-name-prefix and --fqdn-subdomain cannot be used at same time"
681                )
682        return dns_name_prefix
683
684    def get_dns_name_prefix(self) -> Union[str, None]:
685        """Dynamically obtain the value of dns_name_prefix according to the context.
686
687        When both dns_name_prefix and fqdn_subdomain are not assigned, dynamic completion will be triggerd. A default
688        dns_name_prefix composed of name (cluster), resource_group_name, and subscription_id will be created.
689
690        This function will verify the parameter by default. It will check if both dns_name_prefix and fqdn_subdomain
691        are assigend, if so, raise the MutuallyExclusiveArgumentError.
692
693        :return: string or None
694        """
695
696        return self._get_dns_name_prefix(enable_validation=True)
697
698    def get_kubernetes_version(self) -> str:
699        """Obtain the value of kubernetes_version.
700
701        :return: string
702        """
703        # read the original value passed by the command
704        kubernetes_version = self.raw_param.get("kubernetes_version")
705        # try to read the property value corresponding to the parameter from the `mc` object
706        if self.mc and self.mc.kubernetes_version is not None:
707            kubernetes_version = self.mc.kubernetes_version
708
709        # this parameter does not need dynamic completion
710        # this parameter does not need validation
711        return kubernetes_version
712
713    # pylint: disable=unused-argument
714    def _get_vm_set_type(self, read_only: bool = False, **kwargs) -> Union[str, None]:
715        """Internal function to dynamically obtain the value of vm_set_type according to the context.
716
717        Dynamic completion will be triggerd by default. The value of vm set type will be set according to the value of
718        kubernetes_version. It will also normalize the value as server validation is case-sensitive.
719
720        This function supports the option of read_only. When enabled, it will skip dynamic completion and validation.
721
722        :return: string or None
723        """
724        # read the original value passed by the command
725        raw_value = self.raw_param.get("vm_set_type")
726        # try to read the property value corresponding to the parameter from the `mc` object
727        value_obtained_from_mc = None
728        if self.mc and self.mc.agent_pool_profiles:
729            agent_pool_profile = safe_list_get(
730                self.mc.agent_pool_profiles, 0, None
731            )
732            if agent_pool_profile:
733                value_obtained_from_mc = agent_pool_profile.type
734
735        # set default value
736        read_from_mc = False
737        if value_obtained_from_mc is not None:
738            vm_set_type = value_obtained_from_mc
739            read_from_mc = True
740        else:
741            vm_set_type = raw_value
742
743        # skip dynamic completion & validation if option read_only is specified
744        if read_only:
745            return vm_set_type
746
747        # dynamic completion
748        # the value verified by the validator may have case problems, and we will adjust it by default
749        if not read_from_mc:
750            kubernetes_version = self.get_kubernetes_version()
751            if not vm_set_type:
752                if kubernetes_version and StrictVersion(kubernetes_version) < StrictVersion("1.12.9"):
753                    print(
754                        "Setting vm_set_type to availabilityset as it is not specified and kubernetes version({}) "
755                        "less than 1.12.9 only supports availabilityset\n".format(
756                            kubernetes_version
757                        )
758                    )
759                    vm_set_type = "AvailabilitySet"
760            if not vm_set_type:
761                vm_set_type = "VirtualMachineScaleSets"
762
763            # normalize as server validation is case-sensitive
764            if vm_set_type.lower() == "AvailabilitySet".lower():
765                vm_set_type = "AvailabilitySet"
766            if vm_set_type.lower() == "VirtualMachineScaleSets".lower():
767                vm_set_type = "VirtualMachineScaleSets"
768            return vm_set_type
769
770        # this parameter does not need validation
771        return vm_set_type
772
773    def get_vm_set_type(self) -> Union[str, None]:
774        """Dynamically obtain the value of vm_set_type according to the context.
775
776        Dynamic completion will be triggerd by default. The value of vm set type will be set according to the value of
777        kubernetes_version. It will also normalize the value as server validation is case-sensitive.
778
779        :return: string or None
780        """
781
782        # this parameter does not need validation
783        return self._get_vm_set_type()
784
785    def get_nodepool_name(self) -> str:
786        """Dynamically obtain the value of nodepool_name according to the context.
787
788        Note: SDK performs the following validation {'required': True, 'pattern': r'^[a-z][a-z0-9]{0,11}$'}.
789
790        This function will normalize the parameter by default. If no value is assigned, the default value "nodepool1"
791        is set, and if the string length is greater than 12, it is truncated.
792
793        :return: string
794        """
795        # read the original value passed by the command
796        raw_value = self.raw_param.get("nodepool_name")
797        # try to read the property value corresponding to the parameter from the `mc` object
798        value_obtained_from_mc = None
799        if self.mc and self.mc.agent_pool_profiles:
800            agent_pool_profile = safe_list_get(
801                self.mc.agent_pool_profiles, 0, None
802            )
803            if agent_pool_profile:
804                value_obtained_from_mc = agent_pool_profile.name
805
806        # set default value
807        if value_obtained_from_mc is not None:
808            nodepool_name = value_obtained_from_mc
809        else:
810            nodepool_name = raw_value
811            # normalize
812            if not nodepool_name:
813                nodepool_name = "nodepool1"
814            else:
815                nodepool_name = nodepool_name[:12]
816
817        # this parameter does not need validation
818        return nodepool_name
819
820    def get_nodepool_tags(self) -> Union[Dict[str, str], None]:
821        """Obtain the value of nodepool_tags.
822
823        :return: dictionary or None
824        """
825        # read the original value passed by the command
826        raw_value = self.raw_param.get("nodepool_tags")
827        # try to read the property value corresponding to the parameter from the `mc` object
828        value_obtained_from_mc = None
829        if self.mc and self.mc.agent_pool_profiles:
830            agent_pool_profile = safe_list_get(
831                self.mc.agent_pool_profiles, 0, None
832            )
833            if agent_pool_profile:
834                value_obtained_from_mc = agent_pool_profile.tags
835
836        # set default value
837        if value_obtained_from_mc is not None:
838            nodepool_tags = value_obtained_from_mc
839        else:
840            nodepool_tags = raw_value
841
842        # this parameter does not need dynamic completion
843        # this parameter does not need validation
844        return nodepool_tags
845
846    def get_nodepool_labels(self) -> Union[Dict[str, str], None]:
847        """Obtain the value of nodepool_labels.
848
849        :return: dictionary or None
850        """
851        # read the original value passed by the command
852        raw_value = self.raw_param.get("nodepool_labels")
853        # try to read the property value corresponding to the parameter from the `mc` object
854        value_obtained_from_mc = None
855        if self.mc and self.mc.agent_pool_profiles:
856            agent_pool_profile = safe_list_get(
857                self.mc.agent_pool_profiles, 0, None
858            )
859            if agent_pool_profile:
860                value_obtained_from_mc = agent_pool_profile.node_labels
861
862        # set default value
863        if value_obtained_from_mc is not None:
864            nodepool_labels = value_obtained_from_mc
865        else:
866            nodepool_labels = raw_value
867
868        # this parameter does not need dynamic completion
869        # this parameter does not need validation
870        return nodepool_labels
871
872    def get_node_vm_size(self) -> str:
873        """Obtain the value of node_vm_size.
874
875        :return: string
876        """
877        # read the original value passed by the command
878        raw_value = self.raw_param.get("node_vm_size")
879        # try to read the property value corresponding to the parameter from the `mc` object
880        value_obtained_from_mc = None
881        if self.mc and self.mc.agent_pool_profiles:
882            agent_pool_profile = safe_list_get(
883                self.mc.agent_pool_profiles, 0, None
884            )
885            if agent_pool_profile:
886                value_obtained_from_mc = agent_pool_profile.vm_size
887
888        # set default value
889        if value_obtained_from_mc is not None:
890            node_vm_size = value_obtained_from_mc
891        else:
892            node_vm_size = raw_value
893
894        # this parameter does not need dynamic completion
895        # this parameter does not need validation
896        return node_vm_size
897
898    def get_vnet_subnet_id(self) -> Union[str, None]:
899        """Obtain the value of vnet_subnet_id.
900
901        :return: string or None
902        """
903        # read the original value passed by the command
904        raw_value = self.raw_param.get("vnet_subnet_id")
905        # try to read the property value corresponding to the parameter from the `mc` object
906        value_obtained_from_mc = None
907        if self.mc and self.mc.agent_pool_profiles:
908            agent_pool_profile = safe_list_get(
909                self.mc.agent_pool_profiles, 0, None
910            )
911            if agent_pool_profile:
912                value_obtained_from_mc = agent_pool_profile.vnet_subnet_id
913
914        # set default value
915        if value_obtained_from_mc is not None:
916            vnet_subnet_id = value_obtained_from_mc
917        else:
918            vnet_subnet_id = raw_value
919
920        # this parameter does not need dynamic completion
921        # this parameter does not need validation
922        return vnet_subnet_id
923
924    def get_ppg(self) -> Union[str, None]:
925        """Obtain the value of ppg (proximity_placement_group_id).
926
927        :return: string or None
928        """
929        # read the original value passed by the command
930        raw_value = self.raw_param.get("ppg")
931        # try to read the property value corresponding to the parameter from the `mc` object
932        value_obtained_from_mc = None
933        if self.mc and self.mc.agent_pool_profiles:
934            agent_pool_profile = safe_list_get(
935                self.mc.agent_pool_profiles, 0, None
936            )
937            if agent_pool_profile:
938                value_obtained_from_mc = (
939                    agent_pool_profile.proximity_placement_group_id
940                )
941
942        # set default value
943        if value_obtained_from_mc is not None:
944            ppg = value_obtained_from_mc
945        else:
946            ppg = raw_value
947
948        # this parameter does not need dynamic completion
949        # this parameter does not need validation
950        return ppg
951
952    def get_zones(self) -> Union[List[str], None]:
953        """Obtain the value of zones.
954
955        :return: list of strings or None
956        """
957        # read the original value passed by the command
958        raw_value = self.raw_param.get("zones")
959        # try to read the property value corresponding to the parameter from the `mc` object
960        value_obtained_from_mc = None
961        if self.mc and self.mc.agent_pool_profiles:
962            agent_pool_profile = safe_list_get(
963                self.mc.agent_pool_profiles, 0, None
964            )
965            if agent_pool_profile:
966                value_obtained_from_mc = agent_pool_profile.availability_zones
967
968        # set default value
969        if value_obtained_from_mc is not None:
970            zones = value_obtained_from_mc
971        else:
972            zones = raw_value
973
974        # this parameter does not need dynamic completion
975        # this parameter does not need validation
976        return zones
977
978    def get_enable_node_public_ip(self) -> bool:
979        """Obtain the value of enable_node_public_ip.
980
981        :return: bool
982        """
983        # read the original value passed by the command
984        raw_value = self.raw_param.get("enable_node_public_ip")
985        # try to read the property value corresponding to the parameter from the `mc` object
986        value_obtained_from_mc = None
987        if self.mc and self.mc.agent_pool_profiles:
988            agent_pool_profile = safe_list_get(
989                self.mc.agent_pool_profiles, 0, None
990            )
991            if agent_pool_profile:
992                value_obtained_from_mc = (
993                    agent_pool_profile.enable_node_public_ip
994                )
995
996        # set default value
997        if value_obtained_from_mc is not None:
998            enable_node_public_ip = value_obtained_from_mc
999        else:
1000            enable_node_public_ip = raw_value
1001
1002        # this parameter does not need dynamic completion
1003        # this parameter does not need validation
1004        return enable_node_public_ip
1005
1006    def get_node_public_ip_prefix_id(self) -> Union[str, None]:
1007        """Obtain the value of node_public_ip_prefix_id.
1008
1009        :return: string or None
1010        """
1011        # read the original value passed by the command
1012        raw_value = self.raw_param.get("node_public_ip_prefix_id")
1013        # try to read the property value corresponding to the parameter from the `mc` object
1014        value_obtained_from_mc = None
1015        if self.mc and self.mc.agent_pool_profiles:
1016            agent_pool_profile = safe_list_get(
1017                self.mc.agent_pool_profiles, 0, None
1018            )
1019            if agent_pool_profile:
1020                value_obtained_from_mc = (
1021                    agent_pool_profile.node_public_ip_prefix_id
1022                )
1023
1024        # set default value
1025        if value_obtained_from_mc is not None:
1026            node_public_ip_prefix_id = value_obtained_from_mc
1027        else:
1028            node_public_ip_prefix_id = raw_value
1029
1030        # this parameter does not need dynamic completion
1031        # this parameter does not need validation
1032        return node_public_ip_prefix_id
1033
1034    def get_enable_encryption_at_host(self) -> bool:
1035        """Obtain the value of enable_encryption_at_host.
1036
1037        :return: bool
1038        """
1039        # read the original value passed by the command
1040        raw_value = self.raw_param.get("enable_encryption_at_host")
1041        # try to read the property value corresponding to the parameter from the `mc` object
1042        value_obtained_from_mc = None
1043        if self.mc and self.mc.agent_pool_profiles:
1044            agent_pool_profile = safe_list_get(
1045                self.mc.agent_pool_profiles, 0, None
1046            )
1047            if agent_pool_profile:
1048                value_obtained_from_mc = (
1049                    agent_pool_profile.enable_encryption_at_host
1050                )
1051
1052        # set default value
1053        if value_obtained_from_mc is not None:
1054            enable_encryption_at_host = value_obtained_from_mc
1055        else:
1056            enable_encryption_at_host = raw_value
1057
1058        # this parameter does not need dynamic completion
1059        # this parameter does not need validation
1060        return enable_encryption_at_host
1061
1062    def get_enable_ultra_ssd(self) -> bool:
1063        """Obtain the value of enable_ultra_ssd.
1064
1065        :return: bool
1066        """
1067        # read the original value passed by the command
1068        raw_value = self.raw_param.get("enable_ultra_ssd")
1069        # try to read the property value corresponding to the parameter from the `mc` object
1070        value_obtained_from_mc = None
1071        if self.mc and self.mc.agent_pool_profiles:
1072            agent_pool_profile = safe_list_get(
1073                self.mc.agent_pool_profiles, 0, None
1074            )
1075            if agent_pool_profile:
1076                value_obtained_from_mc = agent_pool_profile.enable_ultra_ssd
1077
1078        # set default value
1079        if value_obtained_from_mc is not None:
1080            enable_ultra_ssd = value_obtained_from_mc
1081        else:
1082            enable_ultra_ssd = raw_value
1083
1084        # this parameter does not need dynamic completion
1085        # this parameter does not need validation
1086        return enable_ultra_ssd
1087
1088    def get_max_pods(self) -> Union[int, None]:
1089        """Obtain the value of max_pods.
1090
1091        This function will normalize the parameter by default. The parameter will be converted to int, but int 0 is
1092        converted to None.
1093
1094        :return: int or None
1095        """
1096        # read the original value passed by the command
1097        raw_value = self.raw_param.get("max_pods")
1098        # try to read the property value corresponding to the parameter from the `mc` object
1099        value_obtained_from_mc = None
1100        if self.mc and self.mc.agent_pool_profiles:
1101            agent_pool_profile = safe_list_get(
1102                self.mc.agent_pool_profiles, 0, None
1103            )
1104            if agent_pool_profile:
1105                value_obtained_from_mc = agent_pool_profile.max_pods
1106
1107        # set default value
1108        if value_obtained_from_mc is not None:
1109            max_pods = value_obtained_from_mc
1110        else:
1111            max_pods = raw_value
1112            # Note: int 0 is converted to None
1113            if max_pods:
1114                max_pods = int(max_pods)
1115            else:
1116                max_pods = None
1117
1118        # this parameter does not need validation
1119        return max_pods
1120
1121    def get_node_osdisk_size(self) -> Union[int, None]:
1122        """Obtain the value of node_osdisk_size.
1123
1124        Note: SDK performs the following validation {'maximum': 2048, 'minimum': 0}.
1125
1126        This function will normalize the parameter by default. The parameter will be converted to int, but int 0 is
1127        converted to None.
1128
1129        :return: int or None
1130        """
1131        # read the original value passed by the command
1132        raw_value = self.raw_param.get("node_osdisk_size")
1133        # try to read the property value corresponding to the parameter from the `mc` object
1134        value_obtained_from_mc = None
1135        if self.mc and self.mc.agent_pool_profiles:
1136            agent_pool_profile = safe_list_get(
1137                self.mc.agent_pool_profiles, 0, None
1138            )
1139            if agent_pool_profile:
1140                value_obtained_from_mc = agent_pool_profile.os_disk_size_gb
1141
1142        # set default value
1143        if value_obtained_from_mc is not None:
1144            node_osdisk_size = value_obtained_from_mc
1145        else:
1146            node_osdisk_size = raw_value
1147            # Note: 0 is converted to None
1148            if node_osdisk_size:
1149                node_osdisk_size = int(node_osdisk_size)
1150            else:
1151                node_osdisk_size = None
1152
1153        # this parameter does not need validation
1154        return node_osdisk_size
1155
1156    def get_node_osdisk_type(self) -> Union[str, None]:
1157        """Obtain the value of node_osdisk_type.
1158
1159        :return: string or None
1160        """
1161        # read the original value passed by the command
1162        raw_value = self.raw_param.get("node_osdisk_type")
1163        # try to read the property value corresponding to the parameter from the `mc` object
1164        value_obtained_from_mc = None
1165        if self.mc and self.mc.agent_pool_profiles:
1166            agent_pool_profile = safe_list_get(
1167                self.mc.agent_pool_profiles, 0, None
1168            )
1169            if agent_pool_profile:
1170                value_obtained_from_mc = agent_pool_profile.os_disk_type
1171
1172        # set default value
1173        if value_obtained_from_mc is not None:
1174            node_osdisk_type = value_obtained_from_mc
1175        else:
1176            node_osdisk_type = raw_value
1177
1178        # this parameter does not need dynamic completion
1179        # this parameter does not need validation
1180        return node_osdisk_type
1181
1182    # pylint: disable=too-many-branches
1183    def get_node_count_and_enable_cluster_autoscaler_and_min_count_and_max_count(
1184        self,
1185    ) -> Tuple[int, bool, Union[int, None], Union[int, None]]:
1186        """Obtain the value of node_count, enable_cluster_autoscaler, min_count and max_count.
1187
1188        This function will verify the parameters through function "validate_counts_in_autoscaler" by default.
1189
1190        :return: a tuple containing four elements: node_count of int type, enable_cluster_autoscaler of bool type,
1191        min_count of int type or None and max_count of int type or None
1192        """
1193        # get agent pool profile from `mc`
1194        agent_pool_profile = None
1195        if self.mc and self.mc.agent_pool_profiles:
1196            agent_pool_profile = safe_list_get(
1197                self.mc.agent_pool_profiles, 0, None
1198            )
1199
1200        # node_count
1201        # read the original value passed by the command
1202        node_count = self.raw_param.get("node_count")
1203        # try to read the property value corresponding to the parameter from the `mc` object
1204        if agent_pool_profile and agent_pool_profile.count is not None:
1205            node_count = agent_pool_profile.count
1206
1207        # enable_cluster_autoscaler
1208        # read the original value passed by the command
1209        enable_cluster_autoscaler = self.raw_param.get("enable_cluster_autoscaler")
1210        # try to read the property value corresponding to the parameter from the `mc` object
1211        if agent_pool_profile and agent_pool_profile.enable_auto_scaling is not None:
1212            enable_cluster_autoscaler = agent_pool_profile.enable_auto_scaling
1213
1214        # min_count
1215        # read the original value passed by the command
1216        min_count = self.raw_param.get("min_count")
1217        # try to read the property value corresponding to the parameter from the `mc` object
1218        if agent_pool_profile and agent_pool_profile.min_count is not None:
1219            min_count = agent_pool_profile.min_count
1220
1221        # max_count
1222        # read the original value passed by the command
1223        max_count = self.raw_param.get("max_count")
1224        # try to read the property value corresponding to the parameter from the `mc` object
1225        if agent_pool_profile and agent_pool_profile.max_count is not None:
1226            max_count = agent_pool_profile.max_count
1227
1228        # these parameters do not need dynamic completion
1229
1230        # validation
1231        validate_counts_in_autoscaler(
1232            node_count,
1233            enable_cluster_autoscaler,
1234            min_count,
1235            max_count,
1236            decorator_mode=DecoratorMode.CREATE,
1237        )
1238        return node_count, enable_cluster_autoscaler, min_count, max_count
1239
1240    # pylint: disable=too-many-branches
1241    def get_update_enable_disable_cluster_autoscaler_and_min_max_count(
1242        self,
1243    ) -> Tuple[bool, bool, bool, Union[int, None], Union[int, None]]:
1244        """Obtain the value of update_cluster_autoscaler, enable_cluster_autoscaler, disable_cluster_autoscaler,
1245        min_count and max_count.
1246
1247        This function will verify the parameters through function "validate_counts_in_autoscaler" by default. Besides if
1248        both enable_cluster_autoscaler and update_cluster_autoscaler are specified, a MutuallyExclusiveArgumentError
1249        will be raised. If enable_cluster_autoscaler or update_cluster_autoscaler is specified and there are multiple
1250        agent pool profiles, an ArgumentUsageError will be raised. If enable_cluster_autoscaler is specified and
1251        autoscaler is already enabled in `mc`, it will output warning messages and exit with code 0. If
1252        update_cluster_autoscaler is specified and autoscaler is not enabled in `mc`, it will raise an
1253        InvalidArgumentValueError. If disable_cluster_autoscaler is specified and autoscaler is not enabled in `mc`,
1254        it will output warning messages and exit with code 0.
1255
1256        :return: a tuple containing four elements: update_cluster_autoscaler of bool type, enable_cluster_autoscaler
1257        of bool type, disable_cluster_autoscaler of bool type, min_count of int type or None and max_count of int type
1258        or None
1259        """
1260        # get agent pool profile from `mc`
1261        agent_pool_profile = None
1262        if self.mc and self.mc.agent_pool_profiles:
1263            agent_pool_profile = safe_list_get(
1264                self.mc.agent_pool_profiles, 0, None
1265            )
1266
1267        # update_cluster_autoscaler
1268        # read the original value passed by the command
1269        update_cluster_autoscaler = self.raw_param.get("update_cluster_autoscaler")
1270
1271        # enable_cluster_autoscaler
1272        # read the original value passed by the command
1273        enable_cluster_autoscaler = self.raw_param.get("enable_cluster_autoscaler")
1274
1275        # disable_cluster_autoscaler
1276        # read the original value passed by the command
1277        disable_cluster_autoscaler = self.raw_param.get("disable_cluster_autoscaler")
1278
1279        # min_count
1280        # read the original value passed by the command
1281        min_count = self.raw_param.get("min_count")
1282
1283        # max_count
1284        # read the original value passed by the command
1285        max_count = self.raw_param.get("max_count")
1286
1287        # these parameters do not need dynamic completion
1288
1289        # validation
1290        # For multi-agent pool, use the az aks nodepool command
1291        if (enable_cluster_autoscaler or update_cluster_autoscaler) and len(self.mc.agent_pool_profiles) > 1:
1292            raise ArgumentUsageError(
1293                'There are more than one node pool in the cluster. Please use "az aks nodepool" command '
1294                "to update per node pool auto scaler settings"
1295            )
1296
1297        if enable_cluster_autoscaler + update_cluster_autoscaler + disable_cluster_autoscaler > 1:
1298            raise MutuallyExclusiveArgumentError(
1299                "Can only specify one of --enable-cluster-autoscaler, --update-cluster-autoscaler and "
1300                "--disable-cluster-autoscaler"
1301            )
1302
1303        validate_counts_in_autoscaler(
1304            None,
1305            enable_cluster_autoscaler or update_cluster_autoscaler,
1306            min_count,
1307            max_count,
1308            decorator_mode=DecoratorMode.UPDATE,
1309        )
1310
1311        if enable_cluster_autoscaler and agent_pool_profile.enable_auto_scaling:
1312            logger.warning(
1313                "Cluster autoscaler is already enabled for this node pool.\n"
1314                'Please run "az aks --update-cluster-autoscaler" '
1315                "if you want to update min-count or max-count."
1316            )
1317            sys.exit(0)
1318
1319        if update_cluster_autoscaler and not agent_pool_profile.enable_auto_scaling:
1320            raise InvalidArgumentValueError(
1321                "Cluster autoscaler is not enabled for this node pool.\n"
1322                'Run "az aks nodepool update --enable-cluster-autoscaler" '
1323                "to enable cluster with min-count and max-count."
1324            )
1325
1326        if disable_cluster_autoscaler and not agent_pool_profile.enable_auto_scaling:
1327            logger.warning(
1328                "Cluster autoscaler is already disabled for this node pool."
1329            )
1330            sys.exit(0)
1331
1332        return update_cluster_autoscaler, enable_cluster_autoscaler, disable_cluster_autoscaler, min_count, max_count
1333
1334    def get_admin_username(self) -> str:
1335        """Obtain the value of admin_username.
1336
1337        Note: SDK performs the following validation {'required': True, 'pattern': r'^[A-Za-z][-A-Za-z0-9_]*$'}.
1338
1339        :return: str
1340        """
1341        # read the original value passed by the command
1342        admin_username = self.raw_param.get("admin_username")
1343        # try to read the property value corresponding to the parameter from the `mc` object
1344        if (
1345            self.mc and
1346            self.mc.linux_profile and
1347            self.mc.linux_profile.admin_username is not None
1348        ):
1349            admin_username = self.mc.linux_profile.admin_username
1350
1351        # this parameter does not need dynamic completion
1352        # this parameter does not need validation
1353        return admin_username
1354
1355    # pylint: disable=unused-argument
1356    def _get_windows_admin_username_and_password(
1357        self, read_only: bool = False, **kwargs
1358    ) -> Tuple[Union[str, None], Union[str, None]]:
1359        """Internal function to dynamically obtain the value of windows_admin_username and windows_admin_password
1360        according to the context.
1361
1362        When ont of windows_admin_username and windows_admin_password is not assigned, dynamic completion will be
1363        triggerd. The user will be prompted to enter the missing windows_admin_username or windows_admin_password in
1364        tty (pseudo terminal). If the program is running in a non-interactive environment, a NoTTYError error will be
1365        raised.
1366
1367        This function supports the option of read_only. When enabled, it will skip dynamic completion and validation.
1368
1369        :return: a tuple containing two elements: windows_admin_username of string type or None and
1370        windows_admin_password of string type or None
1371        """
1372        # windows_admin_username
1373        # read the original value passed by the command
1374        windows_admin_username = self.raw_param.get("windows_admin_username")
1375        # try to read the property value corresponding to the parameter from the `mc` object
1376        username_read_from_mc = False
1377        if (
1378            self.mc and
1379            self.mc.windows_profile and
1380            self.mc.windows_profile.admin_username is not None
1381        ):
1382            windows_admin_username = self.mc.windows_profile.admin_username
1383            username_read_from_mc = True
1384
1385        # windows_admin_password
1386        # read the original value passed by the command
1387        windows_admin_password = self.raw_param.get("windows_admin_password")
1388        # try to read the property value corresponding to the parameter from the `mc` object
1389        password_read_from_mc = False
1390        if (
1391            self.mc and
1392            self.mc.windows_profile and
1393            self.mc.windows_profile.admin_password is not None
1394        ):
1395            windows_admin_password = self.mc.windows_profile.admin_password
1396            password_read_from_mc = True
1397
1398        # consistent check
1399        if username_read_from_mc != password_read_from_mc:
1400            raise CLIInternalError(
1401                "Inconsistent state detected, one of windows admin name and password is read from the `mc` object."
1402            )
1403
1404        # skip dynamic completion & validation if option read_only is specified
1405        if read_only:
1406            return windows_admin_username, windows_admin_password
1407
1408        username_dynamic_completion = False
1409        # check whether the parameter meet the conditions of dynamic completion
1410        # to avoid that windows_admin_password is set but windows_admin_username is not
1411        if windows_admin_username is None and windows_admin_password:
1412            username_dynamic_completion = True
1413        # disable dynamic completion if the value is read from `mc`
1414        username_dynamic_completion = (
1415            username_dynamic_completion and not username_read_from_mc
1416        )
1417        if username_dynamic_completion:
1418            try:
1419                windows_admin_username = prompt("windows_admin_username: ")
1420                # The validation for admin_username in ManagedClusterWindowsProfile will fail even if
1421                # users still set windows_admin_username to empty here
1422            except NoTTYException:
1423                raise NoTTYError(
1424                    "Please specify username for Windows in non-interactive mode."
1425                )
1426
1427        password_dynamic_completion = False
1428        # check whether the parameter meet the conditions of dynamic completion
1429        # to avoid that windows_admin_username is set but windows_admin_password is not
1430        if windows_admin_password is None and windows_admin_username:
1431            password_dynamic_completion = True
1432        # disable dynamic completion if the value is read from `mc`
1433        password_dynamic_completion = (
1434            password_dynamic_completion and not password_read_from_mc
1435        )
1436        if password_dynamic_completion:
1437            try:
1438                windows_admin_password = prompt_pass(
1439                    msg="windows-admin-password: ", confirm=True
1440                )
1441            except NoTTYException:
1442                raise NoTTYError(
1443                    "Please specify both username and password in non-interactive mode."
1444                )
1445
1446        # these parameters does not need validation
1447        return windows_admin_username, windows_admin_password
1448
1449    def get_windows_admin_username_and_password(
1450        self,
1451    ) -> Tuple[Union[str, None], Union[str, None]]:
1452        """Dynamically obtain the value of windows_admin_username and windows_admin_password according to the context.
1453
1454        When ont of windows_admin_username and windows_admin_password is not assigned, dynamic completion will be
1455        triggerd. The user will be prompted to enter the missing windows_admin_username or windows_admin_password in
1456        tty (pseudo terminal). If the program is running in a non-interactive environment, a NoTTYError error will be
1457        raised.
1458
1459        :return: a tuple containing two elements: windows_admin_username of string type or None and
1460        windows_admin_password of string type or None
1461        """
1462
1463        return self._get_windows_admin_username_and_password()
1464
1465    def get_enable_ahub(self) -> bool:
1466        """Obtain the value of enable_ahub.
1467
1468        Note: enable_ahub will not be directly decorated into the `mc` object.
1469
1470        :return: bool
1471        """
1472        # read the original value passed by the command
1473        enable_ahub = self.raw_param.get("enable_ahub")
1474        # try to read the property value corresponding to the parameter from the `mc` object
1475        if self.mc and self.mc.windows_profile:
1476            enable_ahub = self.mc.windows_profile.license_type == "Windows_Server"
1477
1478        # this parameter does not need dynamic completion
1479        # this parameter does not need validation
1480        return enable_ahub
1481
1482    # pylint: disable=unused-argument,too-many-statements
1483    def _get_service_principal_and_client_secret(
1484        self, read_only: bool = False, **kwargs
1485    ) -> Tuple[Union[str, None], Union[str, None]]:
1486        """Internal function to dynamically obtain the values of service_principal and client_secret according to the
1487        context.
1488
1489        This function will store an intermediate aad_session_key.
1490
1491        When service_principal and client_secret are not assigned and enable_managed_identity is True, dynamic
1492        completion will not be triggered. For other cases, dynamic completion will be triggered.
1493        When client_secret is given but service_principal is not, dns_name_prefix or fqdn_subdomain will be used to
1494        create a service principal. The parameters subscription_id, location and name (cluster) are also required when
1495        calling function "_ensure_aks_service_principal", which internally used GraphRbacManagementClient to send
1496        the request.
1497        When service_principal is given but client_secret is not, function "_ensure_aks_service_principal" would raise
1498        CLIError.
1499
1500        This function supports the option of read_only. When enabled, it will skip dynamic completion and validation.
1501
1502        :return: a tuple containing two elements: service_principal of string type or None and client_secret of
1503        string type or None
1504        """
1505        # service_principal
1506        # read the original value passed by the command
1507        service_principal = self.raw_param.get("service_principal")
1508        # try to read the property value corresponding to the parameter from the `mc` object
1509        sp_read_from_mc = False
1510        if (
1511            self.mc and
1512            self.mc.service_principal_profile and
1513            self.mc.service_principal_profile.client_id is not None
1514        ):
1515            service_principal = self.mc.service_principal_profile.client_id
1516            sp_read_from_mc = True
1517
1518        # client_secret
1519        # read the original value passed by the command
1520        client_secret = self.raw_param.get("client_secret")
1521        # try to read the property value corresponding to the parameter from the `mc` object
1522        secret_read_from_mc = False
1523        if (
1524            self.mc and
1525            self.mc.service_principal_profile and
1526            self.mc.service_principal_profile.secret is not None
1527        ):
1528            client_secret = self.mc.service_principal_profile.secret
1529            secret_read_from_mc = True
1530
1531        # consistent check
1532        if sp_read_from_mc != secret_read_from_mc:
1533            raise CLIInternalError(
1534                "Inconsistent state detected, one of sp and secret is read from the `mc` object."
1535            )
1536
1537        # skip dynamic completion & validation if option read_only is specified
1538        if read_only:
1539            return service_principal, client_secret
1540
1541        # dynamic completion for service_principal and client_secret
1542        dynamic_completion = False
1543        # check whether the parameter meet the conditions of dynamic completion
1544        enable_managed_identity = self._get_enable_managed_identity(read_only=True)
1545        if not (
1546            enable_managed_identity and
1547            not service_principal and
1548            not client_secret
1549        ):
1550            dynamic_completion = True
1551        # disable dynamic completion if the value is read from `mc`
1552        dynamic_completion = (
1553            dynamic_completion and
1554            not sp_read_from_mc and
1555            not secret_read_from_mc
1556        )
1557        if dynamic_completion:
1558            principal_obj = _ensure_aks_service_principal(
1559                cli_ctx=self.cmd.cli_ctx,
1560                service_principal=service_principal,
1561                client_secret=client_secret,
1562                subscription_id=self.get_subscription_id(),
1563                dns_name_prefix=self._get_dns_name_prefix(enable_validation=False),
1564                fqdn_subdomain=self._get_fqdn_subdomain(enable_validation=False),
1565                location=self.get_location(),
1566                name=self.get_name(),
1567            )
1568            service_principal = principal_obj.get("service_principal")
1569            client_secret = principal_obj.get("client_secret")
1570            self.set_intermediate("aad_session_key", principal_obj.get("aad_session_key"), overwrite_exists=True)
1571
1572        # these parameters do not need validation
1573        return service_principal, client_secret
1574
1575    def get_service_principal_and_client_secret(
1576        self
1577    ) -> Tuple[Union[str, None], Union[str, None]]:
1578        """Dynamically obtain the values of service_principal and client_secret according to the context.
1579
1580        When service_principal and client_secret are not assigned and enable_managed_identity is True, dynamic
1581        completion will not be triggered. For other cases, dynamic completion will be triggered.
1582        When client_secret is given but service_principal is not, dns_name_prefix or fqdn_subdomain will be used to
1583        create a service principal. The parameters subscription_id, location and name (cluster) are also required when
1584        calling function "_ensure_aks_service_principal", which internally used GraphRbacManagementClient to send
1585        the request.
1586        When service_principal is given but client_secret is not, function "_ensure_aks_service_principal" would raise
1587        CLIError.
1588
1589        :return: a tuple containing two elements: service_principal of string type or None and client_secret of
1590        string type or None
1591        """
1592
1593        return self._get_service_principal_and_client_secret()
1594
1595    # pylint: disable=unused-argument
1596    def _get_enable_managed_identity(
1597        self, enable_validation: bool = False, read_only: bool = False, **kwargs
1598    ) -> bool:
1599        """Internal function to dynamically obtain the values of service_principal and client_secret according to the
1600        context.
1601
1602        Note: enable_managed_identity will not be directly decorated into the `mc` object.
1603
1604        When both service_principal and client_secret are assigned and enable_managed_identity is True, dynamic
1605        completion will be triggered. The value of enable_managed_identity will be set to False.
1606
1607        This function supports the option of enable_validation. When enabled, if enable_managed_identity is not
1608        specified and assign_identity is assigned, a RequiredArgumentMissingError will be raised.
1609        This function supports the option of read_only. When enabled, it will skip dynamic completion and validation.
1610
1611        :return: bool
1612        """
1613        # read the original value passed by the command
1614        enable_managed_identity = self.raw_param.get("enable_managed_identity")
1615        # try to read the property value corresponding to the parameter from the `mc` object
1616        read_from_mc = False
1617        if self.mc and self.mc.identity:
1618            enable_managed_identity = check_is_msi_cluster(self.mc)
1619            read_from_mc = True
1620
1621        # skip dynamic completion & validation if option read_only is specified
1622        if read_only:
1623            return enable_managed_identity
1624
1625        # dynamic completion
1626        (
1627            service_principal,
1628            client_secret,
1629        ) = self._get_service_principal_and_client_secret(read_only=True)
1630        if not read_from_mc and service_principal and client_secret:
1631            enable_managed_identity = False
1632
1633        # validation
1634        if enable_validation:
1635            if not enable_managed_identity and self._get_assign_identity(enable_validation=False):
1636                raise RequiredArgumentMissingError(
1637                    "--assign-identity can only be specified when --enable-managed-identity is specified"
1638                )
1639        return enable_managed_identity
1640
1641    def get_enable_managed_identity(self) -> bool:
1642        """Dynamically obtain the values of service_principal and client_secret according to the context.
1643
1644        Note: enable_managed_identity will not be directly decorated into the `mc` object.
1645
1646        When both service_principal and client_secret are assigned and enable_managed_identity is True, dynamic
1647        completion will be triggered. The value of enable_managed_identity will be set to False.
1648
1649        This function will verify the parameter by default. If enable_managed_identity is not specified and
1650        assign_identity is assigned, a RequiredArgumentMissingError will be raised.
1651
1652        :return: bool
1653        """
1654
1655        return self._get_enable_managed_identity(enable_validation=True)
1656
1657    def get_skip_subnet_role_assignment(self) -> bool:
1658        """Obtain the value of skip_subnet_role_assignment.
1659
1660        Note: skip_subnet_role_assignment will not be decorated into the `mc` object.
1661
1662        :return: bool
1663        """
1664        # read the original value passed by the command
1665        skip_subnet_role_assignment = self.raw_param.get("skip_subnet_role_assignment")
1666
1667        # this parameter does not need dynamic completion
1668        # this parameter does not need validation
1669        return skip_subnet_role_assignment
1670
1671    # pylint: disable=unused-argument
1672    def _get_assign_identity(self, enable_validation: bool = False, **kwargs) -> Union[str, None]:
1673        """Internal function to obtain the value of assign_identity.
1674
1675        This function supports the option of enable_validation. When enabled, if enable_managed_identity is not
1676        specified and assign_identity is assigned, a RequiredArgumentMissingError will be raised. Besides, if
1677        assign_identity is not assigned but assign_kubelet_identity is, a RequiredArgumentMissingError will be raised.
1678
1679        :return: string or None
1680        """
1681        # read the original value passed by the command
1682        raw_value = self.raw_param.get("assign_identity")
1683        # try to read the property value corresponding to the parameter from the `mc` object
1684        value_obtained_from_mc = None
1685        if (
1686            self.mc and
1687            self.mc.identity and
1688            self.mc.identity.user_assigned_identities is not None
1689        ):
1690            value_obtained_from_mc = safe_list_get(
1691                list(self.mc.identity.user_assigned_identities.keys()), 0, None
1692            )
1693
1694        # set default value
1695        if value_obtained_from_mc is not None:
1696            assign_identity = value_obtained_from_mc
1697        else:
1698            assign_identity = raw_value
1699
1700        # this parameter does not need dynamic completion
1701
1702        # validation
1703        if enable_validation:
1704            if assign_identity:
1705                if not self._get_enable_managed_identity(enable_validation=False):
1706                    raise RequiredArgumentMissingError(
1707                        "--assign-identity can only be specified when --enable-managed-identity is specified"
1708                    )
1709            else:
1710                if self.get_assign_kubelet_identity():
1711                    raise RequiredArgumentMissingError(
1712                        "--assign-kubelet-identity can only be specified when --assign-identity is specified"
1713                    )
1714        return assign_identity
1715
1716    def get_assign_identity(self) -> Union[str, None]:
1717        """Obtain the value of assign_identity.
1718
1719        This function will verify the parameter by default. If enable_managed_identity is not specified and
1720        assign_identity is assigned, a RequiredArgumentMissingError will be raised. Besides, if assign_identity is not
1721        assigned but assign_kubelet_identity is, a RequiredArgumentMissingError will be raised.
1722
1723        :return: string or None
1724        """
1725
1726        return self._get_assign_identity(enable_validation=True)
1727
1728    def get_identity_by_msi_client(self, assigned_identity: str) -> Identity:
1729        """Helper function to obtain the identity object by msi client.
1730
1731        Note: This is a wrapper of the external function "_get_user_assigned_identity", and the return result of this
1732        function will not be directly decorated into the `mc` object.
1733
1734        This function will use ManagedServiceIdentityClient to send the request, and return an identity object.
1735        ResourceNotFoundError, ClientRequestError or InvalidArgumentValueError exceptions might be raised in the above
1736        process.
1737
1738        :return: string
1739        """
1740        return _get_user_assigned_identity(self.cmd.cli_ctx, assigned_identity)
1741
1742    def get_user_assigned_identity_client_id(self) -> str:
1743        """Helper function to obtain the client_id of user assigned identity.
1744
1745        Note: This is not a parameter of aks_create, and it will not be decorated into the `mc` object.
1746
1747        Parse assign_identity and use ManagedServiceIdentityClient to send the request, get the client_id field in the
1748        returned identity object. ResourceNotFoundError, ClientRequestError or InvalidArgumentValueError exceptions
1749        may be raised in the above process.
1750
1751        :return: string
1752        """
1753        assigned_identity = self.get_assign_identity()
1754        if assigned_identity is None or assigned_identity == "":
1755            raise RequiredArgumentMissingError("No assigned identity provided.")
1756        return self.get_identity_by_msi_client(assigned_identity).client_id
1757
1758    def get_user_assigned_identity_object_id(self) -> str:
1759        """Helper function to obtain the principal_id of user assigned identity.
1760
1761        Note: This is not a parameter of aks_create, and it will not be decorated into the `mc` object.
1762
1763        Parse assign_identity and use ManagedServiceIdentityClient to send the request, get the principal_id field in
1764        the returned identity object. ResourceNotFoundError, ClientRequestError or InvalidArgumentValueError exceptions
1765        may be raised in the above process.
1766
1767        :return: string
1768        """
1769        assigned_identity = self.get_assign_identity()
1770        if assigned_identity is None or assigned_identity == "":
1771            raise RequiredArgumentMissingError("No assigned identity provided.")
1772        return self.get_identity_by_msi_client(assigned_identity).principal_id
1773
1774    def get_yes(self) -> bool:
1775        """Obtain the value of yes.
1776
1777        Note: yes will not be decorated into the `mc` object.
1778
1779        :return: bool
1780        """
1781        # read the original value passed by the command
1782        yes = self.raw_param.get("yes")
1783
1784        # this parameter does not need dynamic completion
1785        # this parameter does not need validation
1786        return yes
1787
1788    def get_no_wait(self) -> bool:
1789        """Obtain the value of no_wait.
1790
1791        Note: no_wait will not be decorated into the `mc` object.
1792
1793        :return: bool
1794        """
1795        # read the original value passed by the command
1796        no_wait = self.raw_param.get("no_wait")
1797
1798        # this parameter does not need dynamic completion
1799        # this parameter does not need validation
1800        return no_wait
1801
1802    def get_attach_acr(self) -> Union[str, None]:
1803        """Obtain the value of attach_acr.
1804
1805        Note: attach_acr will not be decorated into the `mc` object.
1806
1807        This function will verify the parameter by default in create mode. When attach_acr is assigned, if both
1808        enable_managed_identity and no_wait are assigned, a MutuallyExclusiveArgumentError will be raised; if
1809        service_principal is not assigned, raise a RequiredArgumentMissingError.
1810
1811        :return: string or None
1812        """
1813        # read the original value passed by the command
1814        attach_acr = self.raw_param.get("attach_acr")
1815
1816        # this parameter does not need dynamic completion
1817        # validation
1818        if self.decorator_mode == DecoratorMode.CREATE and attach_acr:
1819            if self._get_enable_managed_identity(enable_validation=False) and self.get_no_wait():
1820                raise MutuallyExclusiveArgumentError(
1821                    "When --attach-acr and --enable-managed-identity are both specified, "
1822                    "--no-wait is not allowed, please wait until the whole operation succeeds."
1823                )
1824                # Attach acr operation will be handled after the cluster is created
1825            # newly added check, check whether client_id exists before creating role assignment
1826            service_principal, _ = self._get_service_principal_and_client_secret(read_only=True)
1827            if not service_principal:
1828                raise RequiredArgumentMissingError(
1829                    "No service principal provided to create the acrpull role assignment for acr."
1830                )
1831        return attach_acr
1832
1833    def get_detach_acr(self) -> Union[str, None]:
1834        """Obtain the value of detach_acr.
1835
1836        Note: detach_acr will not be decorated into the `mc` object.
1837
1838        :return: string or None
1839        """
1840        # read the original value passed by the command
1841        detach_acr = self.raw_param.get("detach_acr")
1842
1843        # this parameter does not need dynamic completion
1844        # this parameter does not need validation
1845        return detach_acr
1846
1847    # pylint: disable=unused-argument
1848    def _get_load_balancer_sku(
1849        self, enable_validation: bool = False, read_only: bool = False, **kwargs
1850    ) -> Union[str, None]:
1851        """Internal function to dynamically obtain the value of load_balancer_sku according to the context.
1852
1853        Note: When returning a string, it will always be lowercase.
1854
1855        When load_balancer_sku is not assigned, dynamic completion will be triggerd. Function "set_load_balancer_sku"
1856        will be called and the corresponding load balancer sku will be returned according to the value of
1857        kubernetes_version.
1858
1859        This function supports the option of enable_validation. When enabled, it will check if load_balancer_sku equals
1860        to "basic", if so, when api_server_authorized_ip_ranges is assigned or enable_private_cluster is specified,
1861        raise an InvalidArgumentValueError.
1862        This function supports the option of read_only. When enabled, it will skip dynamic completion and validation.
1863
1864        :return: string or None
1865        """
1866        # read the original value passed by the command
1867        load_balancer_sku = safe_lower(self.raw_param.get("load_balancer_sku"))
1868        # try to read the property value corresponding to the parameter from the `mc` object
1869        read_from_mc = False
1870        if (
1871            self.mc and
1872            self.mc.network_profile and
1873            self.mc.network_profile.load_balancer_sku is not None
1874        ):
1875            load_balancer_sku = safe_lower(
1876                self.mc.network_profile.load_balancer_sku
1877            )
1878            read_from_mc = True
1879
1880        # skip dynamic completion & validation if option read_only is specified
1881        if read_only:
1882            return load_balancer_sku
1883
1884        # dynamic completion
1885        if not read_from_mc and load_balancer_sku is None:
1886            load_balancer_sku = safe_lower(
1887                set_load_balancer_sku(
1888                    sku=load_balancer_sku,
1889                    kubernetes_version=self.get_kubernetes_version(),
1890                )
1891            )
1892
1893        # validation
1894        if enable_validation:
1895            if load_balancer_sku == "basic":
1896                if self.get_api_server_authorized_ip_ranges():
1897                    raise InvalidArgumentValueError(
1898                        "--api-server-authorized-ip-ranges can only be used with standard load balancer"
1899                    )
1900                if self.get_enable_private_cluster():
1901                    raise InvalidArgumentValueError(
1902                        "Please use standard load balancer for private cluster"
1903                    )
1904
1905        return load_balancer_sku
1906
1907    def get_load_balancer_sku(self) -> Union[str, None]:
1908        """Dynamically obtain the value of load_balancer_sku according to the context.
1909
1910        Note: When returning a string, it will always be lowercase.
1911
1912        When load_balancer_sku is not assigned, dynamic completion will be triggerd. Function "set_load_balancer_sku"
1913        will be called and the corresponding load balancer sku will be returned according to the value of
1914        kubernetes_version.
1915
1916        This function will verify the parameter by default. It will check if load_balancer_sku equals to "basic", if so,
1917        when api_server_authorized_ip_ranges is assigned or enable_private_cluster is specified, raise an
1918        InvalidArgumentValueError.
1919
1920        :return: string or None
1921        """
1922
1923        return safe_lower(self._get_load_balancer_sku(enable_validation=True))
1924
1925    def get_load_balancer_managed_outbound_ip_count(self) -> Union[int, None]:
1926        """Obtain the value of load_balancer_managed_outbound_ip_count.
1927
1928        Note: SDK performs the following validation {'maximum': 100, 'minimum': 1}.
1929
1930        :return: int or None
1931        """
1932        # read the original value passed by the command
1933        load_balancer_managed_outbound_ip_count = self.raw_param.get(
1934            "load_balancer_managed_outbound_ip_count"
1935        )
1936        # try to read the property value corresponding to the parameter from the `mc` object
1937        if (
1938            self.mc and
1939            self.mc.network_profile and
1940            self.mc.network_profile.load_balancer_profile and
1941            self.mc.network_profile.load_balancer_profile.managed_outbound_i_ps and
1942            self.mc.network_profile.load_balancer_profile.managed_outbound_i_ps.count is not None
1943        ):
1944            load_balancer_managed_outbound_ip_count = (
1945                self.mc.network_profile.load_balancer_profile.managed_outbound_i_ps.count
1946            )
1947
1948        # this parameter does not need dynamic completion
1949        # this parameter does not need validation
1950        return load_balancer_managed_outbound_ip_count
1951
1952    def get_load_balancer_outbound_ips(self) -> Union[str, List[ResourceReference], None]:
1953        """Obtain the value of load_balancer_outbound_ips.
1954
1955        Note: SDK performs the following validation {'maximum': 16, 'minimum': 1}.
1956
1957        :return: string, list of ResourceReference, or None
1958        """
1959        # read the original value passed by the command
1960        load_balancer_outbound_ips = self.raw_param.get(
1961            "load_balancer_outbound_ips"
1962        )
1963        # try to read the property value corresponding to the parameter from the `mc` object
1964        if (
1965            self.mc and
1966            self.mc.network_profile and
1967            self.mc.network_profile.load_balancer_profile and
1968            self.mc.network_profile.load_balancer_profile.outbound_i_ps and
1969            self.mc.network_profile.load_balancer_profile.outbound_i_ps.public_i_ps is not None
1970        ):
1971            load_balancer_outbound_ips = (
1972                self.mc.network_profile.load_balancer_profile.outbound_i_ps.public_i_ps
1973            )
1974
1975        # this parameter does not need dynamic completion
1976        # this parameter does not need validation
1977        return load_balancer_outbound_ips
1978
1979    def get_load_balancer_outbound_ip_prefixes(self) -> Union[str, List[ResourceReference], None]:
1980        """Obtain the value of load_balancer_outbound_ip_prefixes.
1981
1982        :return: string, list of ResourceReference, or None
1983        """
1984        # read the original value passed by the command
1985        load_balancer_outbound_ip_prefixes = self.raw_param.get(
1986            "load_balancer_outbound_ip_prefixes"
1987        )
1988        # try to read the property value corresponding to the parameter from the `mc` object
1989        if (
1990            self.mc and
1991            self.mc.network_profile and
1992            self.mc.network_profile.load_balancer_profile and
1993            self.mc.network_profile.load_balancer_profile.outbound_ip_prefixes and
1994            self.mc.network_profile.load_balancer_profile.outbound_ip_prefixes.public_ip_prefixes is not None
1995        ):
1996            load_balancer_outbound_ip_prefixes = (
1997                self.mc.network_profile.load_balancer_profile.outbound_ip_prefixes.public_ip_prefixes
1998            )
1999
2000        # this parameter does not need dynamic completion
2001        # this parameter does not need validation
2002        return load_balancer_outbound_ip_prefixes
2003
2004    def get_load_balancer_outbound_ports(self) -> Union[int, None]:
2005        """Obtain the value of load_balancer_outbound_ports.
2006
2007        Note: SDK performs the following validation {'maximum': 64000, 'minimum': 0}.
2008
2009        :return: int or None
2010        """
2011        # read the original value passed by the command
2012        load_balancer_outbound_ports = self.raw_param.get(
2013            "load_balancer_outbound_ports"
2014        )
2015        # try to read the property value corresponding to the parameter from the `mc` object
2016        if (
2017            self.mc and
2018            self.mc.network_profile and
2019            self.mc.network_profile.load_balancer_profile and
2020            self.mc.network_profile.load_balancer_profile.allocated_outbound_ports is not None
2021        ):
2022            load_balancer_outbound_ports = (
2023                self.mc.network_profile.load_balancer_profile.allocated_outbound_ports
2024            )
2025
2026        # this parameter does not need dynamic completion
2027        # this parameter does not need validation
2028        return load_balancer_outbound_ports
2029
2030    def get_load_balancer_idle_timeout(self) -> Union[int, None]:
2031        """Obtain the value of load_balancer_idle_timeout.
2032
2033        Note: SDK performs the following validation {'maximum': 120, 'minimum': 4}.
2034
2035        :return: int or None
2036        """
2037        # read the original value passed by the command
2038        load_balancer_idle_timeout = self.raw_param.get(
2039            "load_balancer_idle_timeout"
2040        )
2041        # try to read the property value corresponding to the parameter from the `mc` object
2042        if (
2043            self.mc and
2044            self.mc.network_profile and
2045            self.mc.network_profile.load_balancer_profile and
2046            self.mc.network_profile.load_balancer_profile.idle_timeout_in_minutes is not None
2047        ):
2048            load_balancer_idle_timeout = (
2049                self.mc.network_profile.load_balancer_profile.idle_timeout_in_minutes
2050            )
2051
2052        # this parameter does not need dynamic completion
2053        # this parameter does not need validation
2054        return load_balancer_idle_timeout
2055
2056    # pylint: disable=unused-argument
2057    def _get_outbound_type(
2058        self,
2059        enable_validation: bool = False,
2060        read_only: bool = False,
2061        load_balancer_profile: ManagedClusterLoadBalancerProfile = None,
2062        **kwargs
2063    ) -> Union[str, None]:
2064        """Internal function to dynamically obtain the value of outbound_type according to the context.
2065
2066        Note: All the external parameters involved in the validation are not verified in their own getters.
2067
2068        When outbound_type is not assigned, dynamic completion will be triggerd. By default, the value is set to
2069        CONST_OUTBOUND_TYPE_LOAD_BALANCER.
2070
2071        This function supports the option of enable_validation. When enabled, if the value of outbound_type is
2072        CONST_OUTBOUND_TYPE_USER_DEFINED_ROUTING, the following checks will be performed. If load_balancer_sku is set
2073        to basic, an InvalidArgumentValueError will be raised. If vnet_subnet_id is not assigned,
2074        a RequiredArgumentMissingError will be raised. If any of load_balancer_managed_outbound_ip_count,
2075        load_balancer_outbound_ips or load_balancer_outbound_ip_prefixes is assigned, a MutuallyExclusiveArgumentError
2076        will be raised.
2077        This function supports the option of read_only. When enabled, it will skip dynamic completion and validation.
2078        This function supports the option of load_balancer_profile, if provided, when verifying loadbalancer-related
2079        parameters, the value in load_balancer_profile will be used for validation.
2080
2081        :return: string or None
2082        """
2083        # read the original value passed by the command
2084        outbound_type = self.raw_param.get("outbound_type")
2085        # try to read the property value corresponding to the parameter from the `mc` object
2086        read_from_mc = False
2087        if (
2088            self.mc and
2089            self.mc.network_profile and
2090            self.mc.network_profile.outbound_type is not None
2091        ):
2092            outbound_type = self.mc.network_profile.outbound_type
2093            read_from_mc = True
2094
2095        # skip dynamic completion & validation if option read_only is specified
2096        if read_only:
2097            return outbound_type
2098
2099        # dynamic completion
2100        if not read_from_mc and outbound_type != CONST_OUTBOUND_TYPE_USER_DEFINED_ROUTING:
2101            outbound_type = CONST_OUTBOUND_TYPE_LOAD_BALANCER
2102
2103        # validation
2104        # Note: The parameters involved in the validation are not verified in their own getters.
2105        if enable_validation:
2106            if outbound_type == CONST_OUTBOUND_TYPE_USER_DEFINED_ROUTING:
2107                # Should not enable read_only for get_load_balancer_sku, since its default value is None, and it has
2108                # not been decorated into the mc object at this time, only the value after dynamic completion is
2109                # meaningful here.
2110                if safe_lower(self._get_load_balancer_sku(enable_validation=False)) == "basic":
2111                    raise InvalidArgumentValueError(
2112                        "userDefinedRouting doesn't support basic load balancer sku"
2113                    )
2114
2115                if self.get_vnet_subnet_id() in ["", None]:
2116                    raise RequiredArgumentMissingError(
2117                        "--vnet-subnet-id must be specified for userDefinedRouting and it must "
2118                        "be pre-configured with a route table with egress rules"
2119                    )
2120
2121                if load_balancer_profile:
2122                    if (
2123                        load_balancer_profile.managed_outbound_i_ps or
2124                        load_balancer_profile.outbound_i_ps or
2125                        load_balancer_profile.outbound_ip_prefixes
2126                    ):
2127                        raise MutuallyExclusiveArgumentError(
2128                            "userDefinedRouting doesn't support customizing a standard load balancer with IP addresses"
2129                        )
2130                else:
2131                    if (
2132                        self.get_load_balancer_managed_outbound_ip_count() or
2133                        self.get_load_balancer_outbound_ips() or
2134                        self.get_load_balancer_outbound_ip_prefixes()
2135                    ):
2136                        raise MutuallyExclusiveArgumentError(
2137                            "userDefinedRouting doesn't support customizing a standard load balancer with IP addresses"
2138                        )
2139
2140        return outbound_type
2141
2142    def get_outbound_type(
2143        self,
2144        load_balancer_profile: ManagedClusterLoadBalancerProfile = None
2145    ) -> Union[str, None]:
2146        """Dynamically obtain the value of outbound_type according to the context.
2147
2148        Note: All the external parameters involved in the validation are not verified in their own getters.
2149
2150        When outbound_type is not assigned, dynamic completion will be triggerd. By default, the value is set to
2151        CONST_OUTBOUND_TYPE_LOAD_BALANCER.
2152
2153        This function will verify the parameter by default. If the value of outbound_type is
2154        CONST_OUTBOUND_TYPE_USER_DEFINED_ROUTING, the following checks will be performed. If load_balancer_sku is set
2155        to basic, an InvalidArgumentValueError will be raised. If vnet_subnet_id is not assigned,
2156        a RequiredArgumentMissingError will be raised. If any of load_balancer_managed_outbound_ip_count,
2157        load_balancer_outbound_ips or load_balancer_outbound_ip_prefixes is assigned, a MutuallyExclusiveArgumentError
2158        will be raised.
2159
2160        This function supports the option of load_balancer_profile, if provided, when verifying loadbalancer-related
2161        parameters, the value in load_balancer_profile will be used for validation.
2162
2163        :return: string or None
2164        """
2165
2166        return self._get_outbound_type(
2167            enable_validation=True, load_balancer_profile=load_balancer_profile
2168        )
2169
2170    # pylint: disable=unused-argument
2171    def _get_network_plugin(self, enable_validation: bool = False, **kwargs) -> Union[str, None]:
2172        """Internal function to Obtain the value of network_plugin.
2173
2174        Note: SDK provides default value "kubenet" for network_plugin.
2175
2176        This function supports the option of enable_validation. When enabled, in case network_plugin is assigned, if
2177        pod_cidr is assigned and the value of network_plugin is azure, an InvalidArgumentValueError will be
2178        raised; otherwise, if any of pod_cidr, service_cidr, dns_service_ip, docker_bridge_address or network_policy
2179        is assigned, a RequiredArgumentMissingError will be raised.
2180
2181        :return: string or None
2182        """
2183        # read the original value passed by the command
2184        network_plugin = self.raw_param.get("network_plugin")
2185        # try to read the property value corresponding to the parameter from the `mc` object
2186        if (
2187            self.mc and
2188            self.mc.network_profile and
2189            self.mc.network_profile.network_plugin is not None
2190        ):
2191            network_plugin = self.mc.network_profile.network_plugin
2192
2193        # this parameter does not need dynamic completion
2194
2195        # validation
2196        if enable_validation:
2197            (
2198                pod_cidr,
2199                service_cidr,
2200                dns_service_ip,
2201                docker_bridge_address,
2202                network_policy,
2203            ) = (
2204                self.get_pod_cidr_and_service_cidr_and_dns_service_ip_and_docker_bridge_address_and_network_policy()
2205            )
2206            if network_plugin:
2207                if network_plugin == "azure" and pod_cidr:
2208                    raise InvalidArgumentValueError(
2209                        "Please use kubenet as the network plugin type when pod_cidr is specified"
2210                    )
2211            else:
2212                if (
2213                    pod_cidr or
2214                    service_cidr or
2215                    dns_service_ip or
2216                    docker_bridge_address or
2217                    network_policy
2218                ):
2219                    raise RequiredArgumentMissingError(
2220                        "Please explicitly specify the network plugin type"
2221                    )
2222        return network_plugin
2223
2224    def get_network_plugin(self) -> Union[str, None]:
2225        """Obtain the value of network_plugin.
2226
2227        Note: SDK provides default value "kubenet" for network_plugin.
2228
2229        This function will verify the parameter by default. In case network_plugin is assigned, if pod_cidr is assigned
2230        and the value of network_plugin is azure, an InvalidArgumentValueError will be raised; otherwise, if any of
2231        pod_cidr, service_cidr, dns_service_ip, docker_bridge_address or network_policy is assigned, a
2232        RequiredArgumentMissingError will be raised.
2233
2234        :return: string or None
2235        """
2236
2237        return self._get_network_plugin(enable_validation=True)
2238
2239    def get_pod_cidr_and_service_cidr_and_dns_service_ip_and_docker_bridge_address_and_network_policy(
2240        self,
2241    ) -> Tuple[
2242        Union[str, None],
2243        Union[str, None],
2244        Union[str, None],
2245        Union[str, None],
2246        Union[str, None],
2247    ]:
2248        """Obtain the value of pod_cidr, service_cidr, dns_service_ip, docker_bridge_address and network_policy.
2249
2250        Note: SDK provides default value "10.244.0.0/16" and performs the following validation
2251        {'pattern': r'^([0-9]{1,3}\\.){3}[0-9]{1,3}(\\/([0-9]|[1-2][0-9]|3[0-2]))?$'} for pod_cidr.
2252        Note: SDK provides default value "10.0.0.0/16" and performs the following validation
2253        {'pattern': r'^([0-9]{1,3}\\.){3}[0-9]{1,3}(\\/([0-9]|[1-2][0-9]|3[0-2]))?$'} for service_cidr.
2254        Note: SDK provides default value "10.0.0.10" and performs the following validation
2255        {'pattern': r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'}
2256        for dns_service_ip.
2257        Note: SDK provides default value "172.17.0.1/16" and performs the following validation
2258        {'pattern': r'^([0-9]{1,3}\\.){3}[0-9]{1,3}(\\/([0-9]|[1-2][0-9]|3[0-2]))?$'} for docker_bridge_address.
2259
2260        This function will verify the parameters by default. If pod_cidr is assigned and the value of network_plugin
2261        is azure, an InvalidArgumentValueError will be raised; otherwise, if any of pod_cidr, service_cidr,
2262        dns_service_ip, docker_bridge_address or network_policy is assigned, a RequiredArgumentMissingError will be
2263        raised.
2264
2265        :return: a tuple of five elements: pod_cidr of string type or None, service_cidr of string type or None,
2266        dns_service_ip of string type or None, docker_bridge_address of string type or None, network_policy of
2267        string type or None.
2268        """
2269        # get network profile from `mc`
2270        network_profile = None
2271        if self.mc:
2272            network_profile = self.mc.network_profile
2273
2274        # pod_cidr
2275        # read the original value passed by the command
2276        pod_cidr = self.raw_param.get("pod_cidr")
2277        # try to read the property value corresponding to the parameter from the `mc` object
2278        if network_profile and network_profile.pod_cidr is not None:
2279            pod_cidr = network_profile.pod_cidr
2280
2281        # service_cidr
2282        # read the original value passed by the command
2283        service_cidr = self.raw_param.get("service_cidr")
2284        # try to read the property value corresponding to the parameter from the `mc` object
2285        if network_profile and network_profile.service_cidr is not None:
2286            service_cidr = network_profile.service_cidr
2287
2288        # dns_service_ip
2289        # read the original value passed by the command
2290        dns_service_ip = self.raw_param.get("dns_service_ip")
2291        # try to read the property value corresponding to the parameter from the `mc` object
2292        if network_profile and network_profile.dns_service_ip is not None:
2293            dns_service_ip = network_profile.dns_service_ip
2294
2295        # docker_bridge_address
2296        # read the original value passed by the command
2297        docker_bridge_address = self.raw_param.get("docker_bridge_address")
2298        # try to read the property value corresponding to the parameter from the `mc` object
2299        if network_profile and network_profile.docker_bridge_cidr is not None:
2300            docker_bridge_address = network_profile.docker_bridge_cidr
2301
2302        # network_policy
2303        # read the original value passed by the command
2304        network_policy = self.raw_param.get("network_policy")
2305        # try to read the property value corresponding to the parameter from the `mc` object
2306        if network_profile and network_profile.network_policy is not None:
2307            network_policy = network_profile.network_policy
2308
2309        # these parameters do not need dynamic completion
2310
2311        # validation
2312        network_plugin = self._get_network_plugin(enable_validation=False)
2313        if network_plugin:
2314            if network_plugin == "azure" and pod_cidr:
2315                raise InvalidArgumentValueError(
2316                    "Please use kubenet as the network plugin type when pod_cidr is specified"
2317                )
2318        else:
2319            if (
2320                pod_cidr or
2321                service_cidr or
2322                dns_service_ip or
2323                docker_bridge_address or
2324                network_policy
2325            ):
2326                raise RequiredArgumentMissingError(
2327                    "Please explicitly specify the network plugin type"
2328                )
2329        return pod_cidr, service_cidr, dns_service_ip, docker_bridge_address, network_policy
2330
2331    # pylint: disable=unused-argument
2332    def _get_enable_addons(self, enable_validation: bool = False, **kwargs) -> List[str]:
2333        """Internal function to obtain the value of enable_addons.
2334
2335        Note: enable_addons will not be directly decorated into the `mc` object and we do not support to fetch it from
2336        `mc`.
2337        Note: Some of the external parameters involved in the validation are not verified in their own getters.
2338
2339        This function supports the option of enable_validation. When enabled, it will check whether the provided addons
2340        have duplicate or invalid values, and raise an InvalidArgumentValueError if found. Besides, if monitoring is
2341        specified in enable_addons but workspace_resource_id is not assigned, or virtual-node is specified but
2342        aci_subnet_name or vnet_subnet_id is not, a RequiredArgumentMissingError will be raised.
2343        This function will normalize the parameter by default. It will split the string into a list with "," as the
2344        delimiter.
2345
2346        :return: empty list or list of strings
2347        """
2348        # read the original value passed by the command
2349        enable_addons = self.raw_param.get("enable_addons")
2350
2351        # normalize
2352        enable_addons = enable_addons.split(',') if enable_addons else []
2353
2354        # validation
2355        if enable_validation:
2356            # check duplicate addons
2357            duplicate_addons_set = {
2358                x for x in enable_addons if enable_addons.count(x) >= 2
2359            }
2360            if len(duplicate_addons_set) != 0:
2361                raise InvalidArgumentValueError(
2362                    "Duplicate addon{} '{}' found in option --enable-addons.".format(
2363                        "s" if len(duplicate_addons_set) > 1 else "",
2364                        ",".join(duplicate_addons_set),
2365                    )
2366                )
2367
2368            # check unrecognized addons
2369            enable_addons_set = set(enable_addons)
2370            invalid_addons_set = enable_addons_set.difference(ADDONS.keys())
2371            if len(invalid_addons_set) != 0:
2372                raise InvalidArgumentValueError(
2373                    "'{}' {} not recognized by the --enable-addons argument.".format(
2374                        ",".join(invalid_addons_set),
2375                        "are" if len(invalid_addons_set) > 1 else "is",
2376                    )
2377                )
2378
2379            # check monitoring/workspace_resource_id
2380            workspace_resource_id = self._get_workspace_resource_id(read_only=True)
2381            if "monitoring" not in enable_addons and workspace_resource_id:
2382                raise RequiredArgumentMissingError(
2383                    '"--workspace-resource-id" requires "--enable-addons monitoring".')
2384
2385            # check virtual node/aci_subnet_name/vnet_subnet_id
2386            # Note: The external parameters involved in the validation are not verified in their own getters.
2387            aci_subnet_name = self.get_aci_subnet_name()
2388            vnet_subnet_id = self.get_vnet_subnet_id()
2389            if "virtual-node" in enable_addons and not (aci_subnet_name and vnet_subnet_id):
2390                raise RequiredArgumentMissingError(
2391                    '"--enable-addons virtual-node" requires "--aci-subnet-name" and "--vnet-subnet-id".')
2392        return enable_addons
2393
2394    def get_enable_addons(self) -> List[str]:
2395        """Obtain the value of enable_addons.
2396
2397        Note: enable_addons will not be directly decorated into the `mc` object and we do not support to fetch it from
2398        `mc`.
2399        Note: Some of the external parameters involved in the validation are not verified in their own getters.
2400
2401        This function will verify the parameters by default. It will check whether the provided addons have duplicate or
2402        invalid values, and raise an InvalidArgumentValueError if found. Besides, if monitoring is specified in
2403        enable_addons but workspace_resource_id is not assigned, or virtual-node is specified but aci_subnet_name or
2404        vnet_subnet_id is not, a RequiredArgumentMissingError will be raised.
2405        This function will normalize the parameter by default. It will split the string into a list with "," as the
2406        delimiter.
2407
2408        :return: empty list or list of strings
2409        """
2410
2411        return self._get_enable_addons(enable_validation=True)
2412
2413    # pylint: disable=unused-argument
2414    def _get_workspace_resource_id(
2415        self, enable_validation: bool = False, read_only: bool = False, **kwargs
2416    ) -> Union[str, None]:
2417        """Internal function to dynamically obtain the value of workspace_resource_id according to the context.
2418
2419        When workspace_resource_id is not assigned, dynamic completion will be triggerd. Function
2420        "_ensure_default_log_analytics_workspace_for_monitoring" will be called to create a workspace with
2421        subscription_id and resource_group_name, which internally used ResourceManagementClient to send the request.
2422
2423        This function supports the option of enable_validation. When enabled, it will check if workspace_resource_id is
2424        assigned but 'monitoring' is not specified in enable_addons, if so, raise a RequiredArgumentMissingError.
2425        This function supports the option of read_only. When enabled, it will skip dynamic completion and validation.
2426
2427        :return: string or None
2428        """
2429        # read the original value passed by the command
2430        workspace_resource_id = self.raw_param.get("workspace_resource_id")
2431        # try to read the property value corresponding to the parameter from the `mc` object
2432        read_from_mc = False
2433        if (
2434            self.mc and
2435            self.mc.addon_profiles and
2436            CONST_MONITORING_ADDON_NAME in self.mc.addon_profiles and
2437            self.mc.addon_profiles.get(
2438                CONST_MONITORING_ADDON_NAME
2439            ).config.get(CONST_MONITORING_LOG_ANALYTICS_WORKSPACE_RESOURCE_ID) is not None
2440        ):
2441            workspace_resource_id = self.mc.addon_profiles.get(
2442                CONST_MONITORING_ADDON_NAME
2443            ).config.get(CONST_MONITORING_LOG_ANALYTICS_WORKSPACE_RESOURCE_ID)
2444            read_from_mc = True
2445
2446        # skip dynamic completion & validation if option read_only is specified
2447        if read_only:
2448            return workspace_resource_id
2449
2450        # dynamic completion
2451        if not read_from_mc:
2452            if workspace_resource_id is None:
2453                # use default workspace if exists else create default workspace
2454                workspace_resource_id = (
2455                    _ensure_default_log_analytics_workspace_for_monitoring(
2456                        self.cmd,
2457                        self.get_subscription_id(),
2458                        self.get_resource_group_name(),
2459                    )
2460                )
2461            # normalize
2462            workspace_resource_id = "/" + workspace_resource_id.strip(" /")
2463
2464        # validation
2465        if enable_validation:
2466            enable_addons = self._get_enable_addons(enable_validation=False)
2467            if workspace_resource_id and "monitoring" not in enable_addons:
2468                raise RequiredArgumentMissingError(
2469                    '"--workspace-resource-id" requires "--enable-addons monitoring".')
2470
2471        # this parameter does not need validation
2472        return workspace_resource_id
2473
2474    def get_workspace_resource_id(self) -> Union[str, None]:
2475        """Dynamically obtain the value of workspace_resource_id according to the context.
2476
2477        When workspace_resource_id is not assigned, dynamic completion will be triggerd. Function
2478        "_ensure_default_log_analytics_workspace_for_monitoring" will be called to create a workspace with
2479        subscription_id and resource_group_name, which internally used ResourceManagementClient to send the request.
2480
2481        :return: string or None
2482        """
2483
2484        return self._get_workspace_resource_id(enable_validation=True)
2485
2486    # pylint: disable=no-self-use
2487    def get_virtual_node_addon_os_type(self) -> str:
2488        """Helper function to obtain the os_type of virtual node addon.
2489
2490        Note: This is not a parameter of aks_create.
2491
2492        :return: string
2493        """
2494        return "Linux"
2495
2496    def get_aci_subnet_name(self) -> Union[str, None]:
2497        """Obtain the value of aci_subnet_name.
2498
2499        :return: string or None
2500        """
2501        # read the original value passed by the command
2502        aci_subnet_name = self.raw_param.get("aci_subnet_name")
2503        # try to read the property value corresponding to the parameter from the `mc` object
2504        if (
2505            self.mc and
2506            self.mc.addon_profiles and
2507            CONST_VIRTUAL_NODE_ADDON_NAME +
2508            self.get_virtual_node_addon_os_type()
2509            in self.mc.addon_profiles and
2510            self.mc.addon_profiles.get(
2511                CONST_VIRTUAL_NODE_ADDON_NAME +
2512                self.get_virtual_node_addon_os_type()
2513            ).config.get(CONST_VIRTUAL_NODE_SUBNET_NAME) is not None
2514        ):
2515            aci_subnet_name = self.mc.addon_profiles.get(
2516                CONST_VIRTUAL_NODE_ADDON_NAME +
2517                self.get_virtual_node_addon_os_type()
2518            ).config.get(CONST_VIRTUAL_NODE_SUBNET_NAME)
2519
2520        # this parameter does not need dynamic completion
2521        # this parameter does not need validation
2522        return aci_subnet_name
2523
2524    def get_appgw_name(self) -> Union[str, None]:
2525        """Obtain the value of appgw_name.
2526
2527        :return: string or None
2528        """
2529        # read the original value passed by the command
2530        appgw_name = self.raw_param.get("appgw_name")
2531        # try to read the property value corresponding to the parameter from the `mc` object
2532        if (
2533            self.mc and
2534            self.mc.addon_profiles and
2535            CONST_INGRESS_APPGW_ADDON_NAME in self.mc.addon_profiles and
2536            self.mc.addon_profiles.get(
2537                CONST_INGRESS_APPGW_ADDON_NAME
2538            ).config.get(CONST_INGRESS_APPGW_APPLICATION_GATEWAY_NAME) is not None
2539        ):
2540            appgw_name = self.mc.addon_profiles.get(
2541                CONST_INGRESS_APPGW_ADDON_NAME
2542            ).config.get(CONST_INGRESS_APPGW_APPLICATION_GATEWAY_NAME)
2543
2544        # this parameter does not need dynamic completion
2545        # this parameter does not need validation
2546        return appgw_name
2547
2548    def get_appgw_subnet_cidr(self) -> Union[str, None]:
2549        """Obtain the value of appgw_subnet_cidr.
2550
2551        :return: string or None
2552        """
2553        # read the original value passed by the command
2554        appgw_subnet_cidr = self.raw_param.get("appgw_subnet_cidr")
2555        # try to read the property value corresponding to the parameter from the `mc` object
2556        if (
2557            self.mc and
2558            self.mc.addon_profiles and
2559            CONST_INGRESS_APPGW_ADDON_NAME in self.mc.addon_profiles and
2560            self.mc.addon_profiles.get(
2561                CONST_INGRESS_APPGW_ADDON_NAME
2562            ).config.get(CONST_INGRESS_APPGW_SUBNET_CIDR) is not None
2563        ):
2564            appgw_subnet_cidr = self.mc.addon_profiles.get(
2565                CONST_INGRESS_APPGW_ADDON_NAME
2566            ).config.get(CONST_INGRESS_APPGW_SUBNET_CIDR)
2567
2568        # this parameter does not need dynamic completion
2569        # this parameter does not need validation
2570        return appgw_subnet_cidr
2571
2572    def get_appgw_id(self) -> Union[str, None]:
2573        """Obtain the value of appgw_id.
2574
2575        :return: string or None
2576        """
2577        # read the original value passed by the command
2578        appgw_id = self.raw_param.get("appgw_id")
2579        # try to read the property value corresponding to the parameter from the `mc` object
2580        if (
2581            self.mc and
2582            self.mc.addon_profiles and
2583            CONST_INGRESS_APPGW_ADDON_NAME in self.mc.addon_profiles and
2584            self.mc.addon_profiles.get(
2585                CONST_INGRESS_APPGW_ADDON_NAME
2586            ).config.get(CONST_INGRESS_APPGW_APPLICATION_GATEWAY_ID) is not None
2587        ):
2588            appgw_id = self.mc.addon_profiles.get(
2589                CONST_INGRESS_APPGW_ADDON_NAME
2590            ).config.get(CONST_INGRESS_APPGW_APPLICATION_GATEWAY_ID)
2591
2592        # this parameter does not need dynamic completion
2593        # this parameter does not need validation
2594        return appgw_id
2595
2596    def get_appgw_subnet_id(self) -> Union[str, None]:
2597        """Obtain the value of appgw_subnet_id.
2598
2599        :return: string or None
2600        """
2601        # read the original value passed by the command
2602        appgw_subnet_id = self.raw_param.get("appgw_subnet_id")
2603        # try to read the property value corresponding to the parameter from the `mc` object
2604        if (
2605            self.mc and
2606            self.mc.addon_profiles and
2607            CONST_INGRESS_APPGW_ADDON_NAME in self.mc.addon_profiles and
2608            self.mc.addon_profiles.get(
2609                CONST_INGRESS_APPGW_ADDON_NAME
2610            ).config.get(CONST_INGRESS_APPGW_SUBNET_ID) is not None
2611        ):
2612            appgw_subnet_id = self.mc.addon_profiles.get(
2613                CONST_INGRESS_APPGW_ADDON_NAME
2614            ).config.get(CONST_INGRESS_APPGW_SUBNET_ID)
2615
2616        # this parameter does not need dynamic completion
2617        # this parameter does not need validation
2618        return appgw_subnet_id
2619
2620    def get_appgw_watch_namespace(self) -> Union[str, None]:
2621        """Obtain the value of appgw_watch_namespace.
2622
2623        :return: string or None
2624        """
2625        # read the original value passed by the command
2626        appgw_watch_namespace = self.raw_param.get("appgw_watch_namespace")
2627        # try to read the property value corresponding to the parameter from the `mc` object
2628        if (
2629            self.mc and
2630            self.mc.addon_profiles and
2631            CONST_INGRESS_APPGW_ADDON_NAME in self.mc.addon_profiles and
2632            self.mc.addon_profiles.get(
2633                CONST_INGRESS_APPGW_ADDON_NAME
2634            ).config.get(CONST_INGRESS_APPGW_WATCH_NAMESPACE) is not None
2635        ):
2636            appgw_watch_namespace = self.mc.addon_profiles.get(
2637                CONST_INGRESS_APPGW_ADDON_NAME
2638            ).config.get(CONST_INGRESS_APPGW_WATCH_NAMESPACE)
2639
2640        # this parameter does not need dynamic completion
2641        # this parameter does not need validation
2642        return appgw_watch_namespace
2643
2644    def get_enable_sgxquotehelper(self) -> bool:
2645        """Obtain the value of enable_sgxquotehelper.
2646
2647        :return: bool
2648        """
2649        # read the original value passed by the command
2650        enable_sgxquotehelper = self.raw_param.get("enable_sgxquotehelper")
2651        # try to read the property value corresponding to the parameter from the `mc` object
2652        if (
2653            self.mc and
2654            self.mc.addon_profiles and
2655            CONST_CONFCOM_ADDON_NAME in self.mc.addon_profiles and
2656            self.mc.addon_profiles.get(
2657                CONST_CONFCOM_ADDON_NAME
2658            ).config.get(CONST_ACC_SGX_QUOTE_HELPER_ENABLED) is not None
2659        ):
2660            enable_sgxquotehelper = self.mc.addon_profiles.get(
2661                CONST_CONFCOM_ADDON_NAME
2662            ).config.get(CONST_ACC_SGX_QUOTE_HELPER_ENABLED) == "true"
2663
2664        # this parameter does not need dynamic completion
2665        # this parameter does not need validation
2666        return enable_sgxquotehelper
2667
2668    # pylint: disable=unused-argument
2669    def _get_enable_aad(self, enable_validation: bool = False, **kwargs) -> bool:
2670        """Internal function to obtain the value of enable_aad.
2671
2672        This function supports the option of enable_validation. When enabled, if the value of enable_aad is True and
2673        any of aad_client_app_id, aad_server_app_id or aad_server_app_secret is asssigned, a
2674        MutuallyExclusiveArgumentError will be raised. If the value of enable_aad is False and the value of
2675        enable_azure_rbac is True, a RequiredArgumentMissingError will be raised.
2676
2677        :return: bool
2678        """
2679        # read the original value passed by the command
2680        enable_aad = self.raw_param.get("enable_aad")
2681        # try to read the property value corresponding to the parameter from the `mc` object
2682        if (
2683            self.mc and
2684            self.mc.aad_profile and
2685            self.mc.aad_profile.managed is not None
2686        ):
2687            enable_aad = self.mc.aad_profile.managed
2688
2689        # this parameter does not need dynamic completion
2690
2691        # validation
2692        if enable_validation:
2693            (
2694                aad_client_app_id,
2695                aad_server_app_id,
2696                aad_server_app_secret,
2697            ) = (
2698                self.get_aad_client_app_id_and_aad_server_app_id_and_aad_server_app_secret()
2699            )
2700            if enable_aad:
2701                if any(
2702                    [
2703                        aad_client_app_id,
2704                        aad_server_app_id,
2705                        aad_server_app_secret,
2706                    ]
2707                ):
2708                    raise MutuallyExclusiveArgumentError(
2709                        "--enable-aad cannot be used together with --aad-client-app-id, --aad-server-app-id or "
2710                        "--aad-server-app-secret"
2711                    )
2712            if not enable_aad and self._get_enable_azure_rbac(enable_validation=False):
2713                raise RequiredArgumentMissingError(
2714                    "--enable-azure-rbac can only be used together with --enable-aad"
2715                )
2716        return enable_aad
2717
2718    def get_enable_aad(self) -> bool:
2719        """Obtain the value of enable_aad.
2720
2721        This function will verify the parameter by default. If the value of enable_aad is True and any of
2722        aad_client_app_id, aad_server_app_id or aad_server_app_secret is asssigned, a MutuallyExclusiveArgumentError
2723        will be raised. If the value of enable_aad is False and the value of enable_azure_rbac is True,
2724        a RequiredArgumentMissingError will be raised.
2725
2726        :return: bool
2727        """
2728
2729        return self._get_enable_aad(enable_validation=True)
2730
2731    def get_aad_client_app_id_and_aad_server_app_id_and_aad_server_app_secret(
2732        self,
2733    ) -> Tuple[Union[str, None], Union[str, None], Union[str, None]]:
2734        """Obtain the value of aad_client_app_id, aad_server_app_id and aad_server_app_secret.
2735
2736        This function will verify the parameters by default. If the value of enable_aad is True and any of
2737        aad_client_app_id, aad_server_app_id or aad_server_app_secret is asssigned, a MutuallyExclusiveArgumentError
2738        will be raised.
2739
2740        :return: a tuple of three elements: aad_client_app_id of string type or None, aad_server_app_id of string type
2741        or None and aad_server_app_secret of string type or None.
2742        """
2743        # get aad profile from `mc`
2744        aad_profile = None
2745        if self.mc:
2746            aad_profile = self.mc.aad_profile
2747
2748        # read the original value passed by the command
2749        aad_client_app_id = self.raw_param.get("aad_client_app_id")
2750        # try to read the property value corresponding to the parameter from the `mc` object
2751        if aad_profile and aad_profile.client_app_id is not None:
2752            aad_client_app_id = aad_profile.client_app_id
2753
2754        # read the original value passed by the command
2755        aad_server_app_id = self.raw_param.get("aad_server_app_id")
2756        # try to read the property value corresponding to the parameter from the `mc` object
2757        if aad_profile and aad_profile.server_app_id is not None:
2758            aad_server_app_id = aad_profile.server_app_id
2759
2760        # read the original value passed by the command
2761        aad_server_app_secret = self.raw_param.get("aad_server_app_secret")
2762        # try to read the property value corresponding to the parameter from the `mc` object
2763        if aad_profile and aad_profile.server_app_secret is not None:
2764            aad_server_app_secret = aad_profile.server_app_secret
2765
2766        # these parameters do not need dynamic completion
2767
2768        # validation
2769        enable_aad = self._get_enable_aad(enable_validation=False)
2770        if enable_aad:
2771            if any(
2772                [
2773                    aad_client_app_id,
2774                    aad_server_app_id,
2775                    aad_server_app_secret,
2776                ]
2777            ):
2778                raise MutuallyExclusiveArgumentError(
2779                    "--enable-aad cannot be used together with --aad-client-app-id, --aad-server-app-id or "
2780                    "--aad-server-app-secret"
2781                )
2782        return aad_client_app_id, aad_server_app_id, aad_server_app_secret
2783
2784    # pylint: disable=unused-argument
2785    def _get_aad_tenant_id(self, read_only: bool = False, **kwargs) -> Union[str, None]:
2786        """Internal function to dynamically obtain the value of aad_server_app_secret according to the context.
2787
2788        When both aad_tenant_id and enable_aad are not assigned, and any of aad_client_app_id, aad_server_app_id or
2789        aad_server_app_secret is asssigned, dynamic completion will be triggerd. Class
2790        "azure.cli.core._profile.Profile" will be instantiated, and then call its "get_login_credentials" method to
2791        get the tenant of the deployment subscription.
2792
2793        This function supports the option of read_only. When enabled, it will skip dynamic completion and validation.
2794
2795        :return: string or None
2796        """
2797        # read the original value passed by the command
2798        aad_tenant_id = self.raw_param.get("aad_tenant_id")
2799        # try to read the property value corresponding to the parameter from the `mc` object
2800        read_from_mc = False
2801        if (
2802            self.mc and
2803            self.mc.aad_profile and
2804            self.mc.aad_profile.tenant_id is not None
2805        ):
2806            aad_tenant_id = self.mc.aad_profile.tenant_id
2807            read_from_mc = True
2808
2809        # skip dynamic completion & validation if option read_only is specified
2810        if read_only:
2811            return aad_tenant_id
2812
2813        # dynamic completion
2814        if not read_from_mc and not self._get_enable_aad(
2815            enable_validation=False
2816        ):
2817            if aad_tenant_id is None and any(
2818                self.get_aad_client_app_id_and_aad_server_app_id_and_aad_server_app_secret()
2819            ):
2820                profile = Profile(cli_ctx=self.cmd.cli_ctx)
2821                _, _, aad_tenant_id = profile.get_login_credentials()
2822
2823        # this parameter does not need validation
2824        return aad_tenant_id
2825
2826    def get_aad_tenant_id(self) -> Union[str, None]:
2827        """Dynamically obtain the value of aad_server_app_secret according to the context.
2828
2829        When both aad_tenant_id and enable_aad are not assigned, and any of aad_client_app_id, aad_server_app_id or
2830        aad_server_app_secret is asssigned, dynamic completion will be triggerd. Class
2831        "azure.cli.core._profile.Profile" will be instantiated, and then call its "get_login_credentials" method to
2832        get the tenant of the deployment subscription.
2833
2834        :return: string or None
2835        """
2836
2837        return self._get_aad_tenant_id()
2838
2839    def get_aad_admin_group_object_ids(self) -> Union[List[str], None]:
2840        """Obtain the value of aad_admin_group_object_ids.
2841
2842        This function will normalize the parameter by default. It will split the string into a list with "," as the
2843        delimiter.
2844
2845        :return: empty list or list of strings, or None
2846        """
2847        # read the original value passed by the command
2848        aad_admin_group_object_ids = self.raw_param.get("aad_admin_group_object_ids")
2849        # try to read the property value corresponding to the parameter from the `mc` object
2850        read_from_mc = False
2851        if (
2852            self.mc and
2853            self.mc.aad_profile and
2854            self.mc.aad_profile.admin_group_object_i_ds is not None
2855        ):
2856            aad_admin_group_object_ids = self.mc.aad_profile.admin_group_object_i_ds
2857            read_from_mc = True
2858
2859        # keep None as None, but empty string ("") to empty list ([])
2860        if not read_from_mc and aad_admin_group_object_ids is not None:
2861            aad_admin_group_object_ids = aad_admin_group_object_ids.split(',') if aad_admin_group_object_ids else []
2862
2863        # this parameter does not need validation
2864        return aad_admin_group_object_ids
2865
2866    # pylint: disable=unused-argument
2867    def _get_disable_rbac(self, enable_validation: bool = False, **kwargs) -> Union[bool, None]:
2868        """Internal function to obtain the value of disable_rbac.
2869
2870        This function supports the option of enable_validation. When enabled, if the values of disable_rbac and
2871        enable_azure_rbac are both True, a MutuallyExclusiveArgumentError will be raised. Besides, if the values of
2872        enable_rbac and disable_rbac are both True, a MutuallyExclusiveArgumentError will be raised.
2873
2874        :return: bool or None
2875        """
2876        # read the original value passed by the command
2877        disable_rbac = self.raw_param.get("disable_rbac")
2878        # try to read the property value corresponding to the parameter from the `mc` object
2879        if (
2880            self.mc and
2881            self.mc.enable_rbac is not None
2882        ):
2883            disable_rbac = not self.mc.enable_rbac
2884
2885        # this parameter does not need dynamic completion
2886
2887        # validation
2888        if enable_validation:
2889            if disable_rbac and self._get_enable_azure_rbac(enable_validation=False):
2890                raise MutuallyExclusiveArgumentError(
2891                    "--enable-azure-rbac cannot be used together with --disable-rbac"
2892                )
2893            if disable_rbac and self.get_enable_rbac():
2894                raise MutuallyExclusiveArgumentError("specify either '--disable-rbac' or '--enable-rbac', not both.")
2895        return disable_rbac
2896
2897    def get_disable_rbac(self) -> Union[bool, None]:
2898        """Obtain the value of disable_rbac.
2899
2900        This function will verify the parameter by default. If the values of disable_rbac and enable_azure_rbac are
2901        both True, a MutuallyExclusiveArgumentError will be raised. Besides, if the values of enable_rbac and
2902        disable_rbac are both True, a MutuallyExclusiveArgumentError will be raised.
2903
2904        :return: bool or None
2905        """
2906
2907        return self._get_disable_rbac(enable_validation=True)
2908
2909    def get_enable_rbac(self) -> Union[bool, None]:
2910        """Obtain the value of enable_rbac.
2911
2912        This function will verify the parameter by default. If the values of enable_rbac and disable_rbac are both True,
2913        a MutuallyExclusiveArgumentError will be raised.
2914
2915        :return: bool or None
2916        """
2917        # read the original value passed by the command
2918        enable_rbac = self.raw_param.get("enable_rbac")
2919        # try to read the property value corresponding to the parameter from the `mc` object
2920        if (
2921            self.mc and
2922            self.mc.enable_rbac is not None
2923        ):
2924            enable_rbac = self.mc.enable_rbac
2925
2926        # this parameter does not need dynamic completion
2927
2928        # validation
2929        if enable_rbac and self._get_disable_rbac(enable_validation=False):
2930            raise MutuallyExclusiveArgumentError("specify either '--disable-rbac' or '--enable-rbac', not both.")
2931        return enable_rbac
2932
2933    # pylint: disable=unused-argument
2934    def _get_enable_azure_rbac(self, enable_validation: bool = False, **kwargs) -> bool:
2935        """Internal function to obtain the value of enable_azure_rbac.
2936
2937        This function supports the option of enable_validation. When enabled, if the values of disable_rbac and
2938        enable_azure_rbac are both True, a MutuallyExclusiveArgumentError will be raised. If the value of enable_aad
2939        is False and the value of enable_azure_rbac is True, a RequiredArgumentMissingError will be raised.
2940
2941        :return: bool
2942        """
2943        # read the original value passed by the command
2944        enable_azure_rbac = self.raw_param.get("enable_azure_rbac")
2945        # try to read the property value corresponding to the parameter from the `mc` object
2946        if (
2947            self.mc and
2948            self.mc.aad_profile and
2949            self.mc.aad_profile.enable_azure_rbac is not None
2950        ):
2951            enable_azure_rbac = self.mc.aad_profile.enable_azure_rbac
2952
2953        # this parameter does not need dynamic completion
2954
2955        # validation
2956        if enable_validation:
2957            if enable_azure_rbac and self._get_disable_rbac(enable_validation=False):
2958                raise MutuallyExclusiveArgumentError(
2959                    "--enable-azure-rbac cannot be used together with --disable-rbac"
2960                )
2961            if enable_azure_rbac and not self._get_enable_aad(enable_validation=False):
2962                raise RequiredArgumentMissingError(
2963                    "--enable-azure-rbac can only be used together with --enable-aad"
2964                )
2965        return enable_azure_rbac
2966
2967    def get_enable_azure_rbac(self) -> bool:
2968        """Obtain the value of enable_azure_rbac.
2969
2970        This function will verify the parameter by default. If the values of disable_rbac and enable_azure_rbac are
2971        both True, a MutuallyExclusiveArgumentError will be raised. If the value of enable_aad is False and the value
2972        of enable_azure_rbac is True, a RequiredArgumentMissingError will be raised.
2973
2974        :return: bool
2975        """
2976
2977        return self._get_enable_azure_rbac(enable_validation=True)
2978
2979    def get_api_server_authorized_ip_ranges(self) -> List[str]:
2980        """Obtain the value of api_server_authorized_ip_ranges.
2981
2982        This function will verify the parameter by default. When api_server_authorized_ip_ranges is assigned, if
2983        load_balancer_sku equals to "basic", raise an InvalidArgumentValueError; if enable_private_cluster is specified,
2984        raise a MutuallyExclusiveArgumentError.
2985        This function will normalize the parameter by default. It will split the string into a list with "," as the
2986        delimiter.
2987
2988        :return: empty list or list of strings
2989        """
2990        # read the original value passed by the command
2991        api_server_authorized_ip_ranges = self.raw_param.get(
2992            "api_server_authorized_ip_ranges"
2993        )
2994        # In create mode, try to read the property value corresponding to the parameter from the `mc` object.
2995        if self.decorator_mode == DecoratorMode.CREATE:
2996            read_from_mc = False
2997            if (
2998                self.mc and
2999                self.mc.api_server_access_profile and
3000                self.mc.api_server_access_profile.authorized_ip_ranges is not None
3001            ):
3002                api_server_authorized_ip_ranges = (
3003                    self.mc.api_server_access_profile.authorized_ip_ranges
3004                )
3005                read_from_mc = True
3006
3007            # normalize
3008            if not read_from_mc:
3009                api_server_authorized_ip_ranges = [
3010                    x.strip()
3011                    for x in (
3012                        api_server_authorized_ip_ranges.split(",")
3013                        if api_server_authorized_ip_ranges
3014                        else []
3015                    )
3016                ]
3017        elif self.decorator_mode == DecoratorMode.UPDATE:
3018            # normalize
3019            if api_server_authorized_ip_ranges is not None:
3020                api_server_authorized_ip_ranges = [
3021                    x.strip()
3022                    for x in (
3023                        api_server_authorized_ip_ranges.split(",")
3024                        if api_server_authorized_ip_ranges
3025                        else []
3026                    )
3027                ]
3028
3029        # validation
3030        if api_server_authorized_ip_ranges:
3031            if safe_lower(self._get_load_balancer_sku(enable_validation=False)) == "basic":
3032                raise InvalidArgumentValueError(
3033                    "--api-server-authorized-ip-ranges can only be used with standard load balancer"
3034                )
3035            if self._get_enable_private_cluster(enable_validation=False):
3036                raise MutuallyExclusiveArgumentError(
3037                    "--api-server-authorized-ip-ranges is not supported for private cluster"
3038                )
3039        return api_server_authorized_ip_ranges
3040
3041    # pylint: disable=unused-argument
3042    def _get_fqdn_subdomain(self, enable_validation: bool = False, **kwargs) -> Union[str, None]:
3043        """Internal function to obtain the value of fqdn_subdomain.
3044
3045        This function supports the option of enable_validation. When enabled, it will check if both dns_name_prefix and
3046        fqdn_subdomain are assigend, if so, raise the MutuallyExclusiveArgumentError. It will also check when both
3047        private_dns_zone and fqdn_subdomain are assigned, if the value of private_dns_zone is
3048        CONST_PRIVATE_DNS_ZONE_SYSTEM, raise an InvalidArgumentValueError; Otherwise if the value of private_dns_zone
3049        is not a valid resource ID, raise an InvalidArgumentValueError.
3050
3051        :return: string or None
3052        """
3053        # read the original value passed by the command
3054        fqdn_subdomain = self.raw_param.get("fqdn_subdomain")
3055        # try to read the property value corresponding to the parameter from the `mc` object
3056        # Backward Compatibility: We also support api version v2020.11.01 in profile 2020-09-01-hybrid and there is
3057        # no such attribute.
3058        if (
3059            self.mc and
3060            hasattr(self.mc, "fqdn_subdomain") and
3061            self.mc.fqdn_subdomain is not None
3062        ):
3063            fqdn_subdomain = self.mc.fqdn_subdomain
3064
3065        # this parameter does not need dynamic completion
3066
3067        # validation
3068        if enable_validation:
3069            if fqdn_subdomain:
3070                if self._get_dns_name_prefix(read_only=True):
3071                    raise MutuallyExclusiveArgumentError(
3072                        "--dns-name-prefix and --fqdn-subdomain cannot be used at same time"
3073                    )
3074                private_dns_zone = self.get_private_dns_zone()
3075                if private_dns_zone:
3076                    if private_dns_zone.lower() != CONST_PRIVATE_DNS_ZONE_SYSTEM:
3077                        if not is_valid_resource_id(private_dns_zone):
3078                            raise InvalidArgumentValueError(
3079                                private_dns_zone + " is not a valid Azure resource ID."
3080                            )
3081                    else:
3082                        raise InvalidArgumentValueError(
3083                            "--fqdn-subdomain should only be used for private cluster with custom private dns zone"
3084                        )
3085        return fqdn_subdomain
3086
3087    def get_fqdn_subdomain(self) -> Union[str, None]:
3088        """Obtain the value of fqdn_subdomain.
3089
3090        This function will verify the parameter by default. It will check if both dns_name_prefix and fqdn_subdomain
3091        are assigend, if so, raise the MutuallyExclusiveArgumentError. It will also check when both private_dns_zone
3092        and fqdn_subdomain are assigned, if the value of private_dns_zone is CONST_PRIVATE_DNS_ZONE_SYSTEM, raise an
3093        InvalidArgumentValueError; Otherwise if the value of private_dns_zone is not a valid resource ID, raise an
3094        InvalidArgumentValueError.
3095
3096        :return: string or None
3097        """
3098
3099        return self._get_fqdn_subdomain(enable_validation=True)
3100
3101    # pylint: disable=unused-argument
3102    def _get_enable_private_cluster(self, enable_validation: bool = False, **kwargs) -> bool:
3103        """Internal function to obtain the value of enable_private_cluster.
3104
3105        This function supports the option of enable_validation. When enabled and enable_private_cluster is specified,
3106        if load_balancer_sku equals to basic, raise an InvalidArgumentValueError; if api_server_authorized_ip_ranges
3107        is assigned, raise an MutuallyExclusiveArgumentError; Otherwise when enable_private_cluster is not specified
3108        and disable_public_fqdn or private_dns_zone is assigned, raise an InvalidArgumentValueError.
3109
3110        :return: bool
3111        """
3112        # read the original value passed by the command
3113        enable_private_cluster = self.raw_param.get("enable_private_cluster")
3114        # try to read the property value corresponding to the parameter from the `mc` object
3115        if (
3116            self.mc and
3117            self.mc.api_server_access_profile and
3118            self.mc.api_server_access_profile.enable_private_cluster is not None
3119        ):
3120            enable_private_cluster = self.mc.api_server_access_profile.enable_private_cluster
3121
3122        # this parameter does not need dynamic completion
3123
3124        # validation
3125        if enable_validation:
3126            if enable_private_cluster:
3127                if safe_lower(self._get_load_balancer_sku(enable_validation=False)) == "basic":
3128                    raise InvalidArgumentValueError(
3129                        "Please use standard load balancer for private cluster"
3130                    )
3131                if self.get_api_server_authorized_ip_ranges():
3132                    raise MutuallyExclusiveArgumentError(
3133                        "--api-server-authorized-ip-ranges is not supported for private cluster"
3134                    )
3135            else:
3136                if self.get_disable_public_fqdn():
3137                    raise InvalidArgumentValueError(
3138                        "--disable-public-fqdn should only be used with --enable-private-cluster"
3139                    )
3140                if self.get_private_dns_zone():
3141                    raise InvalidArgumentValueError(
3142                        "Invalid private dns zone for public cluster. It should always be empty for public cluster"
3143                    )
3144        return enable_private_cluster
3145
3146    def get_enable_private_cluster(self) -> bool:
3147        """Obtain the value of enable_private_cluster.
3148
3149        This function will verify the parameter by default. When enable_private_cluster is specified, if
3150        load_balancer_sku equals to basic, raise an InvalidArgumentValueError; if api_server_authorized_ip_ranges
3151        is assigned, raise an MutuallyExclusiveArgumentError; Otherwise when enable_private_cluster is not specified
3152        and disable_public_fqdn or private_dns_zone is assigned, raise an InvalidArgumentValueError.
3153
3154        :return: bool
3155        """
3156
3157        return self._get_enable_private_cluster(enable_validation=True)
3158
3159    def get_disable_public_fqdn(self) -> bool:
3160        """Obtain the value of disable_public_fqdn.
3161
3162        This function will verify the parameter by default. If enable_private_cluster is not specified and
3163        disable_public_fqdn is assigned, raise an InvalidArgumentValueError.
3164
3165        :return: bool
3166        """
3167        # read the original value passed by the command
3168        disable_public_fqdn = self.raw_param.get("disable_public_fqdn")
3169        # try to read the property value corresponding to the parameter from the `mc` object
3170        if (
3171            self.mc and
3172            self.mc.api_server_access_profile and
3173            self.mc.api_server_access_profile.enable_private_cluster_public_fqdn is not None
3174        ):
3175            disable_public_fqdn = not self.mc.api_server_access_profile.enable_private_cluster_public_fqdn
3176
3177        # this parameter does not need dynamic completion
3178
3179        # validation
3180        enable_private_cluster = self._get_enable_private_cluster(enable_validation=False)
3181        if disable_public_fqdn and not enable_private_cluster:
3182            raise InvalidArgumentValueError("--disable-public-fqdn should only be used with --enable-private-cluster")
3183        return disable_public_fqdn
3184
3185    def get_private_dns_zone(self) -> Union[str, None]:
3186        """Obtain the value of private_dns_zone.
3187
3188        This function will verify the parameter by default. When private_dns_zone is assigned, if enable_private_cluster
3189        is not specified raise an InvalidArgumentValueError. It will also check when both private_dns_zone and
3190        fqdn_subdomain are assigned, if the value of private_dns_zone is CONST_PRIVATE_DNS_ZONE_SYSTEM, raise an
3191        InvalidArgumentValueError; Otherwise if the value of private_dns_zone is not a valid resource ID, raise an
3192        InvalidArgumentValueError.
3193
3194        :return: string or None
3195        """
3196        # read the original value passed by the command
3197        private_dns_zone = self.raw_param.get("private_dns_zone")
3198        # try to read the property value corresponding to the parameter from the `mc` object
3199        if (
3200            self.mc and
3201            self.mc.api_server_access_profile and
3202            self.mc.api_server_access_profile.private_dns_zone is not None
3203        ):
3204            private_dns_zone = self.mc.api_server_access_profile.private_dns_zone
3205
3206        # this parameter does not need dynamic completion
3207
3208        # validation
3209        if private_dns_zone:
3210            if not self._get_enable_private_cluster(enable_validation=False):
3211                raise InvalidArgumentValueError(
3212                    "Invalid private dns zone for public cluster. It should always be empty for public cluster"
3213                )
3214            if private_dns_zone.lower() != CONST_PRIVATE_DNS_ZONE_SYSTEM:
3215                if not is_valid_resource_id(private_dns_zone):
3216                    raise InvalidArgumentValueError(
3217                        private_dns_zone + " is not a valid Azure resource ID."
3218                    )
3219            else:
3220                if self._get_fqdn_subdomain(enable_validation=False):
3221                    raise InvalidArgumentValueError(
3222                        "--fqdn-subdomain should only be used for private cluster with custom private dns zone"
3223                    )
3224        return private_dns_zone
3225
3226    def get_assign_kubelet_identity(self) -> Union[str, None]:
3227        """Obtain the value of assign_kubelet_identity.
3228
3229        This function will verify the parameter by default. If assign_identity is not assigned but
3230        assign_kubelet_identity is, a RequiredArgumentMissingError will be raised.
3231
3232        :return: string or None
3233        """
3234        # read the original value passed by the command
3235        assign_kubelet_identity = self.raw_param.get("assign_kubelet_identity")
3236        # try to read the property value corresponding to the parameter from the `mc` object
3237        if (
3238            self.mc and
3239            self.mc.identity_profile and
3240            self.mc.identity_profile.get("kubeletidentity", None) and
3241            getattr(self.mc.identity_profile.get("kubeletidentity"), "resource_id") is not None
3242        ):
3243            assign_kubelet_identity = getattr(self.mc.identity_profile.get("kubeletidentity"), "resource_id")
3244
3245        # this parameter does not need dynamic completion
3246
3247        # validation
3248        if assign_kubelet_identity and not self._get_assign_identity(enable_validation=False):
3249            raise RequiredArgumentMissingError(
3250                "--assign-kubelet-identity can only be specified when --assign-identity is specified"
3251            )
3252        return assign_kubelet_identity
3253
3254    def get_auto_upgrade_channel(self) -> Union[str, None]:
3255        """Obtain the value of auto_upgrade_channel.
3256
3257        :return: string or None
3258        """
3259        # read the original value passed by the command
3260        auto_upgrade_channel = self.raw_param.get("auto_upgrade_channel")
3261        # try to read the property value corresponding to the parameter from the `mc` object
3262        if (
3263            self.mc and
3264            self.mc.auto_upgrade_profile and
3265            self.mc.auto_upgrade_profile.upgrade_channel is not None
3266        ):
3267            auto_upgrade_channel = self.mc.auto_upgrade_profile.upgrade_channel
3268
3269        # this parameter does not need dynamic completion
3270        # this parameter does not need validation
3271        return auto_upgrade_channel
3272
3273    def get_node_osdisk_diskencryptionset_id(self) -> Union[str, None]:
3274        """Obtain the value of node_osdisk_diskencryptionset_id.
3275
3276        :return: string or None
3277        """
3278        # read the original value passed by the command
3279        node_osdisk_diskencryptionset_id = self.raw_param.get("node_osdisk_diskencryptionset_id")
3280        # try to read the property value corresponding to the parameter from the `mc` object
3281        if (
3282            self.mc and
3283            self.mc.disk_encryption_set_id is not None
3284        ):
3285            node_osdisk_diskencryptionset_id = self.mc.disk_encryption_set_id
3286
3287        # this parameter does not need dynamic completion
3288        # this parameter does not need validation
3289        return node_osdisk_diskencryptionset_id
3290
3291    def get_cluster_autoscaler_profile(self) -> Union[Dict[str, str], None]:
3292        """Dynamically obtain the value of cluster_autoscaler_profile according to the context.
3293
3294        In update mode, when cluster_autoscaler_profile is assigned and auto_scaler_profile in the `mc` object has also
3295        been set, dynamic completion will be triggerd. We will first make a copy of the original configuration
3296        (extract the dictionary from the ManagedClusterPropertiesAutoScalerProfile object), and then update the copied
3297        dictionary with the dictionary of new options.
3298
3299        :return: dictionary or None
3300        """
3301        # read the original value passed by the command
3302        cluster_autoscaler_profile = self.raw_param.get("cluster_autoscaler_profile")
3303        # In create mode, try to read the property value corresponding to the parameter from the `mc` object
3304        if self.decorator_mode == DecoratorMode.CREATE:
3305            if self.mc and self.mc.auto_scaler_profile is not None:
3306                cluster_autoscaler_profile = self.mc.auto_scaler_profile
3307
3308        # dynamic completion
3309        if self.decorator_mode == DecoratorMode.UPDATE:
3310            if cluster_autoscaler_profile and self.mc and self.mc.auto_scaler_profile:
3311                # shallow copy should be enough for string-to-string dictionary
3312                copy_of_raw_dict = self.mc.auto_scaler_profile.__dict__.copy()
3313                new_options_dict = dict(
3314                    (key.replace("-", "_"), value)
3315                    for (key, value) in cluster_autoscaler_profile.items()
3316                )
3317                copy_of_raw_dict.update(new_options_dict)
3318                cluster_autoscaler_profile = copy_of_raw_dict
3319
3320        # this parameter does not need validation
3321        return cluster_autoscaler_profile
3322
3323    def get_uptime_sla(self) -> bool:
3324        """Obtain the value of uptime_sla.
3325
3326        This function will verify the parameter by default. If both uptime_sla and no_uptime_sla are specified, raise
3327        a MutuallyExclusiveArgumentError.
3328
3329        :return: bool
3330        """
3331        # read the original value passed by the command
3332        uptime_sla = self.raw_param.get("uptime_sla")
3333        # In create mode, try to read the property value corresponding to the parameter from the `mc` object
3334        if self.decorator_mode == DecoratorMode.CREATE:
3335            if (
3336                self.mc and
3337                self.mc.sku and
3338                self.mc.sku.tier is not None
3339            ):
3340                uptime_sla = self.mc.sku.tier == "Paid"
3341
3342        # this parameter does not need dynamic completion
3343
3344        # validation
3345        if uptime_sla and self._get_no_uptime_sla(enable_validation=False):
3346            raise MutuallyExclusiveArgumentError(
3347                'Cannot specify "--uptime-sla" and "--no-uptime-sla" at the same time.'
3348            )
3349        return uptime_sla
3350
3351    # pylint: disable=unused-argument
3352    def _get_no_uptime_sla(self, enable_validation: bool = False, **kwargs) -> bool:
3353        """Internal function to obtain the value of no_uptime_sla.
3354
3355        This function supports the option of enable_validation. When enabled, if both uptime_sla and no_uptime_sla are
3356        specified, raise a MutuallyExclusiveArgumentError.
3357
3358        :return: bool
3359        """
3360        # read the original value passed by the command
3361        no_uptime_sla = self.raw_param.get("no_uptime_sla")
3362
3363        # this parameter does not need dynamic completion
3364
3365        # validation
3366        if enable_validation:
3367            if no_uptime_sla and self.get_uptime_sla():
3368                raise MutuallyExclusiveArgumentError(
3369                    'Cannot specify "--uptime-sla" and "--no-uptime-sla" at the same time.'
3370                )
3371        return no_uptime_sla
3372
3373    def get_no_uptime_sla(self) -> bool:
3374        """Obtain the value of no_uptime_sla.
3375
3376        This function will verify the parameter by default. If both uptime_sla and no_uptime_sla are specified, raise
3377        a MutuallyExclusiveArgumentError.
3378
3379        :return: bool
3380        """
3381
3382        return self._get_no_uptime_sla(enable_validation=True)
3383
3384    def get_tags(self) -> Union[Dict[str, str], None]:
3385        """Obtain the value of tags.
3386
3387        :return: dictionary or None
3388        """
3389        # read the original value passed by the command
3390        tags = self.raw_param.get("tags")
3391        # In create mode, try to read the property value corresponding to the parameter from the `mc` object
3392        if self.decorator_mode == DecoratorMode.CREATE:
3393            if self.mc and self.mc.tags is not None:
3394                tags = self.mc.tags
3395
3396        # this parameter does not need dynamic completion
3397        # this parameter does not need validation
3398        return tags
3399
3400    def get_edge_zone(self) -> Union[str, None]:
3401        """Obtain the value of edge_zone.
3402
3403        :return: string or None
3404        """
3405        # read the original value passed by the command
3406        edge_zone = self.raw_param.get("edge_zone")
3407        # try to read the property value corresponding to the parameter from the `mc` object
3408        # Backward Compatibility: We also support api version v2020.11.01 in profile 2020-09-01-hybrid and there is
3409        # no such attribute.
3410        if (
3411            self.mc and
3412            hasattr(self.mc, "extended_location") and
3413            self.mc.extended_location and
3414            self.mc.extended_location.name is not None
3415        ):
3416            edge_zone = self.mc.extended_location.name
3417
3418        # this parameter does not need dynamic completion
3419        # this parameter does not need validation
3420        return edge_zone
3421
3422    def get_disable_local_accounts(self) -> bool:
3423        """Obtain the value of disable_local_accounts.
3424
3425        :return: bool
3426        """
3427        # read the original value passed by the command
3428        disable_local_accounts = self.raw_param.get("disable_local_accounts")
3429        # this parameter does not need dynamic completion
3430        # this parameter does not need validation
3431        return disable_local_accounts
3432
3433    def get_client_id_from_identity_or_sp_profile(self) -> str:
3434        """Helper function to obtain the value of client_id from identity_profile or service_principal_profile.
3435
3436        Note: This is not a parameter of aks_update, and it will not be decorated into the `mc` object.
3437
3438        If client_id cannot be obtained, raise an UnknownError.
3439
3440        :return: string
3441        """
3442        client_id = None
3443        if check_is_msi_cluster(self.mc):
3444            if self.mc.identity_profile is None or self.mc.identity_profile["kubeletidentity"] is None:
3445                raise UnknownError(
3446                    "Unexpected error getting kubelet's identity for the cluster. "
3447                    "Please do not set --attach-acr or --detach-acr. "
3448                    "You can manually grant or revoke permission to the identity named "
3449                    "<ClUSTER_NAME>-agentpool in MC_ resource group to access ACR."
3450                )
3451            client_id = self.mc.identity_profile["kubeletidentity"].client_id
3452        elif self.mc and self.mc.service_principal_profile is not None:
3453            client_id = self.mc.service_principal_profile.client_id
3454
3455        if not client_id:
3456            raise UnknownError('Cannot get the AKS cluster\'s service principal.')
3457        return client_id
3458
3459
3460class AKSCreateDecorator:
3461    def __init__(
3462        self,
3463        cmd: AzCliCommand,
3464        client: ContainerServiceClient,
3465        models: AKSModels,
3466        raw_parameters: Dict,
3467        resource_type: ResourceType = ResourceType.MGMT_CONTAINERSERVICE,
3468    ):
3469        """Internal controller of aks_create.
3470
3471        Break down the all-in-one aks_create function into several relatively independent functions (some of them have
3472        a certain order dependency) that only focus on a specific profile or process a specific piece of logic.
3473        In addition, an overall control function is provided. By calling the aforementioned independent functions one
3474        by one, a complete ManagedCluster object is gradually decorated and finally requests are sent to create a
3475        cluster.
3476        """
3477        self.cmd = cmd
3478        self.client = client
3479        self.models = models
3480        # store the context in the process of assemble the ManagedCluster object
3481        self.context = AKSContext(cmd, raw_parameters, decorator_mode=DecoratorMode.CREATE)
3482        # `resource_type` is used to dynamically find the model (of a specific api version) provided by the
3483        # containerservice SDK, most models have been passed through the `models` parameter (instantiatied
3484        # from `AKSModels` (or `PreviewAKSModels` in aks-preview), where resource_type (i.e.,
3485        # api version) has been specified).
3486        self.resource_type = resource_type
3487
3488    def init_mc(self) -> ManagedCluster:
3489        """Initialize a ManagedCluster object with several parameters and attach it to internal context.
3490
3491        When location is not assigned, function "_get_rg_location" will be called to get the location of the provided
3492        resource group, which internally used ResourceManagementClient to send the request.
3493
3494        :return: the ManagedCluster object
3495        """
3496        # Initialize a ManagedCluster object with mandatory parameter location and optional parameters tags, dns_prefix,
3497        # kubernetes_version, disable_rbac, node_osdisk_diskencryptionset_id, disable_local_accounts.
3498        mc = self.models.ManagedCluster(
3499            location=self.context.get_location(),
3500            tags=self.context.get_tags(),
3501            dns_prefix=self.context.get_dns_name_prefix(),
3502            kubernetes_version=self.context.get_kubernetes_version(),
3503            enable_rbac=not self.context.get_disable_rbac(),
3504            disk_encryption_set_id=self.context.get_node_osdisk_diskencryptionset_id(),
3505            disable_local_accounts=self.context.get_disable_local_accounts(),
3506        )
3507
3508        # attach mc to AKSContext
3509        self.context.attach_mc(mc)
3510        return mc
3511
3512    def set_up_agent_pool_profiles(self, mc: ManagedCluster) -> ManagedCluster:
3513        """Set up agent pool profiles for the ManagedCluster object.
3514
3515        :return: the ManagedCluster object
3516        """
3517        if not isinstance(mc, self.models.ManagedCluster):
3518            raise CLIInternalError(
3519                "Unexpected mc object with type '{}'.".format(type(mc))
3520            )
3521
3522        (
3523            node_count,
3524            enable_auto_scaling,
3525            min_count,
3526            max_count,
3527        ) = (
3528            self.context.get_node_count_and_enable_cluster_autoscaler_and_min_count_and_max_count()
3529        )
3530        agent_pool_profile = self.models.ManagedClusterAgentPoolProfile(
3531            # Must be 12 chars or less before ACS RP adds to it
3532            name=self.context.get_nodepool_name(),
3533            tags=self.context.get_nodepool_tags(),
3534            node_labels=self.context.get_nodepool_labels(),
3535            count=node_count,
3536            vm_size=self.context.get_node_vm_size(),
3537            os_type="Linux",
3538            vnet_subnet_id=self.context.get_vnet_subnet_id(),
3539            proximity_placement_group_id=self.context.get_ppg(),
3540            availability_zones=self.context.get_zones(),
3541            enable_node_public_ip=self.context.get_enable_node_public_ip(),
3542            node_public_ip_prefix_id=self.context.get_node_public_ip_prefix_id(),
3543            enable_encryption_at_host=self.context.get_enable_encryption_at_host(),
3544            enable_ultra_ssd=self.context.get_enable_ultra_ssd(),
3545            max_pods=self.context.get_max_pods(),
3546            type=self.context.get_vm_set_type(),
3547            mode="System",
3548            os_disk_size_gb=self.context.get_node_osdisk_size(),
3549            os_disk_type=self.context.get_node_osdisk_type(),
3550            min_count=min_count,
3551            max_count=max_count,
3552            enable_auto_scaling=enable_auto_scaling,
3553        )
3554        mc.agent_pool_profiles = [agent_pool_profile]
3555        return mc
3556
3557    def set_up_linux_profile(self, mc: ManagedCluster) -> ManagedCluster:
3558        """Set up linux profile for the ManagedCluster object.
3559
3560        Linux profile is just used for SSH access to VMs, so it will be omitted if --no-ssh-key option was specified.
3561
3562        :return: the ManagedCluster object
3563        """
3564        if not isinstance(mc, self.models.ManagedCluster):
3565            raise CLIInternalError(
3566                "Unexpected mc object with type '{}'.".format(type(mc))
3567            )
3568
3569        ssh_key_value, no_ssh_key = self.context.get_ssh_key_value_and_no_ssh_key()
3570        if not no_ssh_key:
3571            ssh_config = self.models.ContainerServiceSshConfiguration(
3572                public_keys=[
3573                    self.models.ContainerServiceSshPublicKey(
3574                        key_data=ssh_key_value
3575                    )
3576                ]
3577            )
3578            linux_profile = self.models.ContainerServiceLinuxProfile(
3579                admin_username=self.context.get_admin_username(), ssh=ssh_config
3580            )
3581            mc.linux_profile = linux_profile
3582        return mc
3583
3584    def set_up_windows_profile(self, mc: ManagedCluster) -> ManagedCluster:
3585        """Set up windows profile for the ManagedCluster object.
3586
3587        :return: the ManagedCluster object
3588        """
3589        if not isinstance(mc, self.models.ManagedCluster):
3590            raise CLIInternalError(
3591                "Unexpected mc object with type '{}'.".format(type(mc))
3592            )
3593
3594        (
3595            windows_admin_username,
3596            windows_admin_password,
3597        ) = self.context.get_windows_admin_username_and_password()
3598        if windows_admin_username or windows_admin_password:
3599            windows_license_type = None
3600            if self.context.get_enable_ahub():
3601                windows_license_type = "Windows_Server"
3602
3603            # this would throw an error if windows_admin_username is empty (the user enters an empty
3604            # string after being prompted), since admin_username is a required parameter
3605            windows_profile = self.models.ManagedClusterWindowsProfile(
3606                admin_username=windows_admin_username,
3607                admin_password=windows_admin_password,
3608                license_type=windows_license_type,
3609            )
3610
3611            mc.windows_profile = windows_profile
3612        return mc
3613
3614    def set_up_service_principal_profile(self, mc: ManagedCluster) -> ManagedCluster:
3615        """Set up service principal profile for the ManagedCluster object.
3616
3617        The function "_ensure_aks_service_principal" will be called if the user provides an incomplete sp and secret
3618        pair, which internally used GraphRbacManagementClient to send the request to create sp.
3619
3620        :return: the ManagedCluster object
3621        """
3622        if not isinstance(mc, self.models.ManagedCluster):
3623            raise CLIInternalError(
3624                "Unexpected mc object with type '{}'.".format(type(mc))
3625            )
3626
3627        # If customer explicitly provide a service principal, disable managed identity.
3628        (
3629            service_principal,
3630            client_secret,
3631        ) = self.context.get_service_principal_and_client_secret()
3632        enable_managed_identity = self.context.get_enable_managed_identity()
3633        # Skip create service principal profile for the cluster if the cluster enables managed identity
3634        # and customer doesn't explicitly provide a service principal.
3635        if not (
3636            enable_managed_identity and
3637            not service_principal and
3638            not client_secret
3639        ):
3640            service_principal_profile = (
3641                self.models.ManagedClusterServicePrincipalProfile(
3642                    client_id=service_principal, secret=client_secret
3643                )
3644            )
3645            mc.service_principal_profile = service_principal_profile
3646        return mc
3647
3648    def process_add_role_assignment_for_vnet_subnet(self, mc: ManagedCluster) -> None:
3649        """Add role assignment for vent subnet.
3650
3651        This function will store an intermediate need_post_creation_vnet_permission_granting.
3652
3653        The function "subnet_role_assignment_exists" will be called to verify if the role assignment already exists for
3654        the subnet, which internally used AuthorizationManagementClient to send the request.
3655        The wrapper function "get_identity_by_msi_client" will be called by "get_user_assigned_identity_client_id" to
3656        get the identity object, which internally use ManagedServiceIdentityClient to send the request.
3657        The function "_add_role_assignment" will be called to add role assignment for the subnet, which internally used
3658        AuthorizationManagementClient to send the request.
3659
3660        :return: None
3661        """
3662        if not isinstance(mc, self.models.ManagedCluster):
3663            raise CLIInternalError(
3664                "Unexpected mc object with type '{}'.".format(type(mc))
3665            )
3666
3667        need_post_creation_vnet_permission_granting = False
3668        vnet_subnet_id = self.context.get_vnet_subnet_id()
3669        skip_subnet_role_assignment = (
3670            self.context.get_skip_subnet_role_assignment()
3671        )
3672        if (
3673            vnet_subnet_id and
3674            not skip_subnet_role_assignment and
3675            not subnet_role_assignment_exists(self.cmd, vnet_subnet_id)
3676        ):
3677            # if service_principal_profile is None, then this cluster is an MSI cluster,
3678            # and the service principal does not exist. Two cases:
3679            # 1. For system assigned identity, we just tell user to grant the
3680            # permission after the cluster is created to keep consistent with portal experience.
3681            # 2. For user assigned identity, we can grant needed permission to
3682            # user provided user assigned identity before creating managed cluster.
3683            service_principal_profile = mc.service_principal_profile
3684            assign_identity = self.context.get_assign_identity()
3685            if service_principal_profile is None and not assign_identity:
3686                msg = (
3687                    "It is highly recommended to use USER assigned identity "
3688                    "(option --assign-identity) when you want to bring your own"
3689                    "subnet, which will have no latency for the role assignment to "
3690                    "take effect. When using SYSTEM assigned identity, "
3691                    "azure-cli will grant Network Contributor role to the "
3692                    "system assigned identity after the cluster is created, and "
3693                    "the role assignment will take some time to take effect, see "
3694                    "https://docs.microsoft.com/azure/aks/use-managed-identity, "
3695                    "proceed to create cluster with system assigned identity?"
3696                )
3697                if not self.context.get_yes() and not prompt_y_n(
3698                    msg, default="n"
3699                ):
3700                    return None
3701                need_post_creation_vnet_permission_granting = True
3702            else:
3703                scope = vnet_subnet_id
3704                identity_client_id = ""
3705                if assign_identity:
3706                    identity_client_id = (
3707                        self.context.get_user_assigned_identity_client_id()
3708                    )
3709                else:
3710                    identity_client_id = service_principal_profile.client_id
3711                if not _add_role_assignment(
3712                    self.cmd,
3713                    "Network Contributor",
3714                    identity_client_id,
3715                    scope=scope,
3716                ):
3717                    logger.warning(
3718                        "Could not create a role assignment for subnet. Are you an Owner on this subscription?"
3719                    )
3720        # store need_post_creation_vnet_permission_granting as an intermediate
3721        self.context.set_intermediate(
3722            "need_post_creation_vnet_permission_granting",
3723            need_post_creation_vnet_permission_granting,
3724            overwrite_exists=True,
3725        )
3726
3727    def process_attach_acr(self, mc: ManagedCluster) -> None:
3728        """Attach acr for the cluster.
3729
3730        The function "_ensure_aks_acr" will be called to create an AcrPull role assignment for the acr, which
3731        internally used AuthorizationManagementClient to send the request.
3732
3733        :return: None
3734        """
3735        if not isinstance(mc, self.models.ManagedCluster):
3736            raise CLIInternalError(
3737                "Unexpected mc object with type '{}'.".format(type(mc))
3738            )
3739
3740        attach_acr = self.context.get_attach_acr()
3741        if attach_acr:
3742            # If enable_managed_identity, attach acr operation will be handled after the cluster is created
3743            if not self.context.get_enable_managed_identity():
3744                service_principal_profile = mc.service_principal_profile
3745                _ensure_aks_acr(
3746                    self.cmd,
3747                    client_id=service_principal_profile.client_id,
3748                    acr_name_or_id=attach_acr,
3749                    # not actually used
3750                    subscription_id=self.context.get_subscription_id(),
3751                )
3752
3753    def set_up_network_profile(self, mc: ManagedCluster) -> ManagedCluster:
3754        """Set up network profile for the ManagedCluster object.
3755
3756        Build load balancer profile, verify outbound type and load balancer sku first, then set up network profile.
3757
3758        :return: the ManagedCluster object
3759        """
3760        if not isinstance(mc, self.models.ManagedCluster):
3761            raise CLIInternalError(
3762                "Unexpected mc object with type '{}'.".format(type(mc))
3763            )
3764
3765        # build load balancer profile, which is part of the network profile
3766        load_balancer_profile = create_load_balancer_profile(
3767            self.context.get_load_balancer_managed_outbound_ip_count(),
3768            self.context.get_load_balancer_outbound_ips(),
3769            self.context.get_load_balancer_outbound_ip_prefixes(),
3770            self.context.get_load_balancer_outbound_ports(),
3771            self.context.get_load_balancer_idle_timeout(),
3772            models=self.models.lb_models,
3773        )
3774
3775        # verify outbound type
3776        # Note: Validation internally depends on load_balancer_sku, which is a temporary value that is
3777        # dynamically completed.
3778        outbound_type = self.context.get_outbound_type(
3779            load_balancer_profile=load_balancer_profile
3780        )
3781
3782        # verify load balancer sku
3783        load_balancer_sku = safe_lower(self.context.get_load_balancer_sku())
3784
3785        # verify network_plugin, pod_cidr, service_cidr, dns_service_ip, docker_bridge_address, network_policy
3786        network_plugin = self.context.get_network_plugin()
3787        (
3788            pod_cidr,
3789            service_cidr,
3790            dns_service_ip,
3791            docker_bridge_address,
3792            network_policy,
3793        ) = (
3794            self.context.get_pod_cidr_and_service_cidr_and_dns_service_ip_and_docker_bridge_address_and_network_policy()
3795        )
3796        network_profile = None
3797        if any(
3798            [
3799                network_plugin,
3800                pod_cidr,
3801                service_cidr,
3802                dns_service_ip,
3803                docker_bridge_address,
3804                network_policy,
3805            ]
3806        ):
3807            # Attention: RP would return UnexpectedLoadBalancerSkuForCurrentOutboundConfiguration internal server error
3808            # if load_balancer_sku is set to basic and load_balancer_profile is assigned.
3809            # Attention: SDK provides default values for pod_cidr, service_cidr, dns_service_ip, docker_bridge_cidr
3810            # and outbound_type, and they might be overwritten to None.
3811            network_profile = self.models.ContainerServiceNetworkProfile(
3812                network_plugin=network_plugin,
3813                pod_cidr=pod_cidr,
3814                service_cidr=service_cidr,
3815                dns_service_ip=dns_service_ip,
3816                docker_bridge_cidr=docker_bridge_address,
3817                network_policy=network_policy,
3818                load_balancer_sku=load_balancer_sku,
3819                load_balancer_profile=load_balancer_profile,
3820                outbound_type=outbound_type,
3821            )
3822        else:
3823            if load_balancer_sku == "standard" or load_balancer_profile:
3824                network_profile = self.models.ContainerServiceNetworkProfile(
3825                    network_plugin="kubenet",
3826                    load_balancer_sku=load_balancer_sku,
3827                    load_balancer_profile=load_balancer_profile,
3828                    outbound_type=outbound_type,
3829                )
3830            if load_balancer_sku == "basic":
3831                # load balancer sku must be standard when load balancer profile is provided
3832                network_profile = self.models.ContainerServiceNetworkProfile(
3833                    load_balancer_sku=load_balancer_sku,
3834                )
3835        mc.network_profile = network_profile
3836        return mc
3837
3838    # pylint: disable=too-many-statements
3839    def set_up_addon_profiles(self, mc: ManagedCluster) -> ManagedCluster:
3840        """Set up addon profiles for the ManagedCluster object.
3841
3842        This function will store following intermediates: monitoring, enable_virtual_node and
3843        ingress_appgw_addon_enabled.
3844
3845        The function "_ensure_container_insights_for_monitoring" will be called to create a deployment which publishes
3846        the Container Insights solution to the Log Analytics workspace.
3847        When workspace_resource_id is not assigned, function "_ensure_default_log_analytics_workspace_for_monitoring"
3848        will be called to create a workspace, which internally used ResourceManagementClient to send the request.
3849
3850        :return: the ManagedCluster object
3851        """
3852        if not isinstance(mc, self.models.ManagedCluster):
3853            raise CLIInternalError(
3854                "Unexpected mc object with type '{}'.".format(type(mc))
3855            )
3856
3857        ManagedClusterAddonProfile = self.models.ManagedClusterAddonProfile
3858        addon_profiles = {}
3859        # error out if any unrecognized or duplicate addon provided
3860        # error out if '--enable-addons=monitoring' isn't set but workspace_resource_id is
3861        # error out if '--enable-addons=virtual-node' is set but aci_subnet_name and vnet_subnet_id are not
3862        addons = self.context.get_enable_addons()
3863        if 'http_application_routing' in addons:
3864            addon_profiles[CONST_HTTP_APPLICATION_ROUTING_ADDON_NAME] = ManagedClusterAddonProfile(
3865                enabled=True)
3866            addons.remove('http_application_routing')
3867        if 'kube-dashboard' in addons:
3868            addon_profiles[CONST_KUBE_DASHBOARD_ADDON_NAME] = ManagedClusterAddonProfile(
3869                enabled=True)
3870            addons.remove('kube-dashboard')
3871        # TODO: can we help the user find a workspace resource ID?
3872        if 'monitoring' in addons:
3873            workspace_resource_id = self.context.get_workspace_resource_id()
3874            addon_profiles[CONST_MONITORING_ADDON_NAME] = ManagedClusterAddonProfile(
3875                enabled=True, config={CONST_MONITORING_LOG_ANALYTICS_WORKSPACE_RESOURCE_ID: workspace_resource_id})
3876            # post-process, create a deployment
3877            _ensure_container_insights_for_monitoring(self.cmd, addon_profiles[CONST_MONITORING_ADDON_NAME])
3878            # set intermediate
3879            self.context.set_intermediate("monitoring", True, overwrite_exists=True)
3880            addons.remove('monitoring')
3881        if 'azure-policy' in addons:
3882            addon_profiles[CONST_AZURE_POLICY_ADDON_NAME] = ManagedClusterAddonProfile(
3883                enabled=True)
3884            addons.remove('azure-policy')
3885        if 'virtual-node' in addons:
3886            aci_subnet_name = self.context.get_aci_subnet_name()
3887            # TODO: how about aciConnectorwindows, what is its addon name?
3888            os_type = self.context.get_virtual_node_addon_os_type()
3889            addon_profiles[CONST_VIRTUAL_NODE_ADDON_NAME + os_type] = ManagedClusterAddonProfile(
3890                enabled=True,
3891                config={CONST_VIRTUAL_NODE_SUBNET_NAME: aci_subnet_name}
3892            )
3893            # set intermediate
3894            self.context.set_intermediate("enable_virtual_node", True, overwrite_exists=True)
3895            addons.remove('virtual-node')
3896        if 'ingress-appgw' in addons:
3897            addon_profile = ManagedClusterAddonProfile(enabled=True, config={})
3898            appgw_name = self.context.get_appgw_name()
3899            appgw_subnet_cidr = self.context.get_appgw_subnet_cidr()
3900            appgw_id = self.context.get_appgw_id()
3901            appgw_subnet_id = self.context.get_appgw_subnet_id()
3902            appgw_watch_namespace = self.context.get_appgw_watch_namespace()
3903            if appgw_name is not None:
3904                addon_profile.config[CONST_INGRESS_APPGW_APPLICATION_GATEWAY_NAME] = appgw_name
3905            if appgw_subnet_cidr is not None:
3906                addon_profile.config[CONST_INGRESS_APPGW_SUBNET_CIDR] = appgw_subnet_cidr
3907            if appgw_id is not None:
3908                addon_profile.config[CONST_INGRESS_APPGW_APPLICATION_GATEWAY_ID] = appgw_id
3909            if appgw_subnet_id is not None:
3910                addon_profile.config[CONST_INGRESS_APPGW_SUBNET_ID] = appgw_subnet_id
3911            if appgw_watch_namespace is not None:
3912                addon_profile.config[CONST_INGRESS_APPGW_WATCH_NAMESPACE] = appgw_watch_namespace
3913            addon_profiles[CONST_INGRESS_APPGW_ADDON_NAME] = addon_profile
3914            # set intermediate
3915            self.context.set_intermediate("ingress_appgw_addon_enabled", True, overwrite_exists=True)
3916            addons.remove('ingress-appgw')
3917        if 'confcom' in addons:
3918            addon_profile = ManagedClusterAddonProfile(
3919                enabled=True, config={CONST_ACC_SGX_QUOTE_HELPER_ENABLED: "false"})
3920            if self.context.get_enable_sgxquotehelper():
3921                addon_profile.config[CONST_ACC_SGX_QUOTE_HELPER_ENABLED] = "true"
3922            addon_profiles[CONST_CONFCOM_ADDON_NAME] = addon_profile
3923            addons.remove('confcom')
3924        if 'open-service-mesh' in addons:
3925            addon_profile = ManagedClusterAddonProfile(enabled=True, config={})
3926            addon_profiles[CONST_OPEN_SERVICE_MESH_ADDON_NAME] = addon_profile
3927            addons.remove('open-service-mesh')
3928        mc.addon_profiles = addon_profiles
3929        return mc
3930
3931    def set_up_aad_profile(self, mc: ManagedCluster) -> ManagedCluster:
3932        """Set up aad profile for the ManagedCluster object.
3933
3934        :return: the ManagedCluster object
3935        """
3936        if not isinstance(mc, self.models.ManagedCluster):
3937            raise CLIInternalError(
3938                "Unexpected mc object with type '{}'.".format(type(mc))
3939            )
3940
3941        aad_profile = None
3942        enable_aad = self.context.get_enable_aad()
3943        if enable_aad:
3944            aad_profile = self.models.ManagedClusterAADProfile(
3945                managed=True,
3946                enable_azure_rbac=self.context.get_enable_azure_rbac(),
3947                # ids -> i_ds due to track 2 naming issue
3948                admin_group_object_i_ds=self.context.get_aad_admin_group_object_ids(),
3949                tenant_id=self.context.get_aad_tenant_id()
3950            )
3951        else:
3952            (
3953                aad_client_app_id,
3954                aad_server_app_id,
3955                aad_server_app_secret,
3956            ) = (
3957                self.context.get_aad_client_app_id_and_aad_server_app_id_and_aad_server_app_secret()
3958            )
3959            aad_tenant_id = self.context.get_aad_tenant_id()
3960            if any([aad_client_app_id, aad_server_app_id, aad_server_app_secret, aad_tenant_id]):
3961                aad_profile = self.models.ManagedClusterAADProfile(
3962                    client_app_id=aad_client_app_id,
3963                    server_app_id=aad_server_app_id,
3964                    server_app_secret=aad_server_app_secret,
3965                    tenant_id=aad_tenant_id
3966                )
3967        mc.aad_profile = aad_profile
3968        return mc
3969
3970    def set_up_api_server_access_profile(self, mc: ManagedCluster) -> ManagedCluster:
3971        """Set up api server access profile and fqdn subdomain for the ManagedCluster object.
3972
3973        :return: the ManagedCluster object
3974        """
3975        if not isinstance(mc, self.models.ManagedCluster):
3976            raise CLIInternalError(
3977                "Unexpected mc object with type '{}'.".format(type(mc))
3978            )
3979
3980        api_server_access_profile = None
3981        api_server_authorized_ip_ranges = self.context.get_api_server_authorized_ip_ranges()
3982        enable_private_cluster = self.context.get_enable_private_cluster()
3983        disable_public_fqdn = self.context.get_disable_public_fqdn()
3984        private_dns_zone = self.context.get_private_dns_zone()
3985        if api_server_authorized_ip_ranges or enable_private_cluster:
3986            api_server_access_profile = self.models.ManagedClusterAPIServerAccessProfile(
3987                authorized_ip_ranges=api_server_authorized_ip_ranges,
3988                enable_private_cluster=True if enable_private_cluster else None,
3989                enable_private_cluster_public_fqdn=False if disable_public_fqdn else None,
3990                private_dns_zone=private_dns_zone
3991            )
3992        mc.api_server_access_profile = api_server_access_profile
3993
3994        fqdn_subdomain = self.context.get_fqdn_subdomain()
3995        mc.fqdn_subdomain = fqdn_subdomain
3996        return mc
3997
3998    def set_up_identity(self, mc: ManagedCluster) -> ManagedCluster:
3999        """Set up identity for the ManagedCluster object.
4000
4001        :return: the ManagedCluster object
4002        """
4003        if not isinstance(mc, self.models.ManagedCluster):
4004            raise CLIInternalError(
4005                "Unexpected mc object with type '{}'.".format(type(mc))
4006            )
4007
4008        identity = None
4009        enable_managed_identity = self.context.get_enable_managed_identity()
4010        assign_identity = self.context.get_assign_identity()
4011        if enable_managed_identity and not assign_identity:
4012            identity = self.models.ManagedClusterIdentity(
4013                type="SystemAssigned"
4014            )
4015        elif enable_managed_identity and assign_identity:
4016            user_assigned_identity = {
4017                assign_identity: self.models.ManagedServiceIdentityUserAssignedIdentitiesValue()
4018            }
4019            identity = self.models.ManagedClusterIdentity(
4020                type="UserAssigned",
4021                user_assigned_identities=user_assigned_identity
4022            )
4023        mc.identity = identity
4024        return mc
4025
4026    def set_up_identity_profile(self, mc: ManagedCluster) -> ManagedCluster:
4027        """Set up identity profile for the ManagedCluster object.
4028
4029        The wrapper function "get_identity_by_msi_client" will be called (by "get_user_assigned_identity_object_id") to
4030        get the identity object, which internally use ManagedServiceIdentityClient to send the request.
4031        The function "_ensure_cluster_identity_permission_on_kubelet_identity" will be called to create a role
4032        assignment if necessary, which internally used AuthorizationManagementClient to send the request.
4033
4034        :return: the ManagedCluster object
4035        """
4036        if not isinstance(mc, self.models.ManagedCluster):
4037            raise CLIInternalError(
4038                "Unexpected mc object with type '{}'.".format(type(mc))
4039            )
4040
4041        identity_profile = None
4042        assign_kubelet_identity = self.context.get_assign_kubelet_identity()
4043        if assign_kubelet_identity:
4044            kubelet_identity = self.context.get_identity_by_msi_client(assign_kubelet_identity)
4045            identity_profile = {
4046                'kubeletidentity': self.models.UserAssignedIdentity(
4047                    resource_id=assign_kubelet_identity,
4048                    client_id=kubelet_identity.client_id,
4049                    object_id=kubelet_identity.principal_id
4050                )
4051            }
4052            cluster_identity_object_id = self.context.get_user_assigned_identity_object_id()
4053            # ensure the cluster identity has "Managed Identity Operator" role at the scope of kubelet identity
4054            _ensure_cluster_identity_permission_on_kubelet_identity(
4055                self.cmd,
4056                cluster_identity_object_id,
4057                assign_kubelet_identity)
4058        mc.identity_profile = identity_profile
4059        return mc
4060
4061    def set_up_auto_upgrade_profile(self, mc: ManagedCluster) -> ManagedCluster:
4062        """Set up auto upgrade profile for the ManagedCluster object.
4063
4064        :return: the ManagedCluster object
4065        """
4066        if not isinstance(mc, self.models.ManagedCluster):
4067            raise CLIInternalError(
4068                "Unexpected mc object with type '{}'.".format(type(mc))
4069            )
4070
4071        auto_upgrade_profile = None
4072        auto_upgrade_channel = self.context.get_auto_upgrade_channel()
4073        if auto_upgrade_channel:
4074            auto_upgrade_profile = self.models.ManagedClusterAutoUpgradeProfile(upgrade_channel=auto_upgrade_channel)
4075        mc.auto_upgrade_profile = auto_upgrade_profile
4076        return mc
4077
4078    def set_up_auto_scaler_profile(self, mc: ManagedCluster) -> ManagedCluster:
4079        """Set up autoscaler profile for the ManagedCluster object.
4080
4081        :return: the ManagedCluster object
4082        """
4083        if not isinstance(mc, self.models.ManagedCluster):
4084            raise CLIInternalError(
4085                "Unexpected mc object with type '{}'.".format(type(mc))
4086            )
4087
4088        cluster_autoscaler_profile = self.context.get_cluster_autoscaler_profile()
4089        mc.auto_scaler_profile = cluster_autoscaler_profile
4090        return mc
4091
4092    def set_up_sku(self, mc: ManagedCluster) -> ManagedCluster:
4093        """Set up sku (uptime sla) for the ManagedCluster object.
4094
4095        :return: the ManagedCluster object
4096        """
4097        if not isinstance(mc, self.models.ManagedCluster):
4098            raise CLIInternalError(
4099                "Unexpected mc object with type '{}'.".format(type(mc))
4100            )
4101
4102        if self.context.get_uptime_sla():
4103            mc.sku = self.models.ManagedClusterSKU(
4104                name="Basic",
4105                tier="Paid"
4106            )
4107        return mc
4108
4109    def set_up_extended_location(self, mc: ManagedCluster) -> ManagedCluster:
4110        """Set up extended location (edge zone) for the ManagedCluster object.
4111
4112        :return: the ManagedCluster object
4113        """
4114        if not isinstance(mc, self.models.ManagedCluster):
4115            raise CLIInternalError(
4116                "Unexpected mc object with type '{}'.".format(type(mc))
4117            )
4118
4119        edge_zone = self.context.get_edge_zone()
4120        if edge_zone:
4121            mc.extended_location = self.models.ExtendedLocation(
4122                name=edge_zone,
4123                type=self.models.ExtendedLocationTypes.EDGE_ZONE
4124            )
4125        return mc
4126
4127    def build_custom_headers(self, mc: ManagedCluster) -> None:
4128        """Build a dictionary contains custom headers.
4129
4130        This function will store an intermediate custom_headers.
4131
4132        :return: None
4133        """
4134        if not isinstance(mc, self.models.ManagedCluster):
4135            raise CLIInternalError(
4136                "Unexpected mc object with type '{}'.".format(type(mc))
4137            )
4138
4139        # Add AAD session key to header.
4140        # If principal_obj is None, we will not add this header, this can happen when the cluster enables managed
4141        # identity. In this case, the header is useless and that's OK to not add this header.
4142        custom_headers = None
4143        if mc.service_principal_profile:
4144            custom_headers = {'Ocp-Aad-Session-Key': self.context.get_intermediate("aad_session_key")}
4145        self.context.set_intermediate("custom_headers", custom_headers, overwrite_exists=True)
4146
4147    def construct_default_mc_profile(self) -> ManagedCluster:
4148        """The overall controller used to construct the default ManagedCluster profile.
4149
4150        The completely constructed ManagedCluster object will later be passed as a parameter to the underlying SDK
4151        (mgmt-containerservice) to send the actual request.
4152
4153        :return: the ManagedCluster object
4154        """
4155        # initialize the ManagedCluster object
4156        mc = self.init_mc()
4157        # set up agent pool profile(s)
4158        mc = self.set_up_agent_pool_profiles(mc)
4159        # set up linux profile (for ssh access)
4160        mc = self.set_up_linux_profile(mc)
4161        # set up windows profile
4162        mc = self.set_up_windows_profile(mc)
4163        # set up service principal profile
4164        mc = self.set_up_service_principal_profile(mc)
4165        # add role assignment for vent subnet
4166        self.process_add_role_assignment_for_vnet_subnet(mc)
4167        # attach acr (add role assignment for acr)
4168        self.process_attach_acr(mc)
4169        # set up network profile
4170        mc = self.set_up_network_profile(mc)
4171        # set up addon profiles
4172        mc = self.set_up_addon_profiles(mc)
4173        # set up aad profile
4174        mc = self.set_up_aad_profile(mc)
4175        # set up api server access profile and fqdn subdomain
4176        mc = self.set_up_api_server_access_profile(mc)
4177        # set up identity
4178        mc = self.set_up_identity(mc)
4179        # set up identity profile
4180        mc = self.set_up_identity_profile(mc)
4181        # set up auto upgrade profile
4182        mc = self.set_up_auto_upgrade_profile(mc)
4183        # set up auto scaler profile
4184        mc = self.set_up_auto_scaler_profile(mc)
4185        # set up sku
4186        mc = self.set_up_sku(mc)
4187        # set up extended location
4188        mc = self.set_up_extended_location(mc)
4189        # build custom header
4190        self.build_custom_headers(mc)
4191        return mc
4192
4193    def create_mc(self, mc: ManagedCluster) -> ManagedCluster:
4194        """Send request to create a real managed cluster.
4195
4196        The function "_put_managed_cluster_ensuring_permission" will be called to use the ContainerServiceClient to
4197        send a reqeust to create a real managed cluster, and also add necessary role assignments for some optional
4198        components.
4199
4200        :return: the ManagedCluster object
4201        """
4202        if not isinstance(mc, self.models.ManagedCluster):
4203            raise CLIInternalError(
4204                "Unexpected mc object with type '{}'.".format(type(mc))
4205            )
4206
4207        # Due to SPN replication latency, we do a few retries here
4208        max_retry = 30
4209        retry_exception = Exception(None)
4210        for _ in range(0, max_retry):
4211            try:
4212                created_cluster = _put_managed_cluster_ensuring_permission(
4213                    self.cmd,
4214                    self.client,
4215                    self.context.get_subscription_id(),
4216                    self.context.get_resource_group_name(),
4217                    self.context.get_name(),
4218                    mc,
4219                    self.context.get_intermediate("monitoring"),
4220                    self.context.get_intermediate("ingress_appgw_addon_enabled"),
4221                    self.context.get_intermediate("enable_virtual_node"),
4222                    self.context.get_intermediate("need_post_creation_vnet_permission_granting"),
4223                    self.context.get_vnet_subnet_id(),
4224                    self.context.get_enable_managed_identity(),
4225                    self.context.get_attach_acr(),
4226                    self.context.get_intermediate("custom_headers"),
4227                    self.context.get_no_wait())
4228                return created_cluster
4229            except CloudError as ex:
4230                retry_exception = ex
4231                if 'not found in Active Directory tenant' in ex.message:
4232                    time.sleep(3)
4233                else:
4234                    raise ex
4235        raise retry_exception
4236
4237
4238class AKSUpdateDecorator:
4239    def __init__(
4240        self,
4241        cmd: AzCliCommand,
4242        client: ContainerServiceClient,
4243        models: AKSModels,
4244        raw_parameters: Dict,
4245    ):
4246        """Internal controller of aks_update.
4247
4248        Break down the all-in-one aks_update function into several relatively independent functions (some of them have
4249        a certain order dependency) that only focus on a specific profile or process a specific piece of logic.
4250        In addition, an overall control function is provided. By calling the aforementioned independent functions one
4251        by one, a complete ManagedCluster object is gradually updated and finally requests are sent to update an
4252        existing cluster.
4253        """
4254        self.cmd = cmd
4255        self.client = client
4256        self.models = models
4257        # store the context in the process of assemble the ManagedCluster object
4258        self.context = AKSContext(cmd, raw_parameters, decorator_mode=DecoratorMode.UPDATE)
4259
4260    def check_raw_parameters(self):
4261        """Helper function to check whether any parameters are set.
4262
4263        If the values of all the parameters are the default values, the command execution will be terminated early and
4264        raise a RequiredArgumentMissingError. Neither the request to fetch or update the ManagedCluster object will be
4265        sent.
4266
4267        :return: None
4268        """
4269        # exclude some irrelevant or mandatory parameters
4270        excluded_keys = ("cmd", "client", "resource_group_name", "name")
4271        # check whether the remaining parameters are set
4272        # the default value None or False (and other empty values, like empty string) will be considered as not set
4273        is_changed = any(v for k, v in self.context.raw_param.items() if k not in excluded_keys)
4274
4275        # special cases
4276        # some parameters support the use of empty string or dictionary to update/remove previously set values
4277        is_default = (
4278            self.context.get_cluster_autoscaler_profile() is None and
4279            self.context.get_api_server_authorized_ip_ranges() is None
4280        )
4281
4282        if not is_changed and is_default:
4283            # Note: Uncomment the followings to automatically generate the error message.
4284            # option_names = [
4285            #     '"{}"'.format(format_parameter_name_to_option_name(x))
4286            #     for x in self.context.raw_param.keys()
4287            #     if x not in excluded_keys
4288            # ]
4289            # error_msg = "Please specify one or more of {}.".format(
4290            #     " or ".join(option_names)
4291            # )
4292            # raise RequiredArgumentMissingError(error_msg)
4293            raise RequiredArgumentMissingError(
4294                'Please specify one or more of "--enable-cluster-autoscaler" or '
4295                '"--disable-cluster-autoscaler" or '
4296                '"--update-cluster-autoscaler" or '
4297                '"--cluster-autoscaler-profile" or '
4298                '"--load-balancer-managed-outbound-ip-count" or '
4299                '"--load-balancer-outbound-ips" or '
4300                '"--load-balancer-outbound-ip-prefixes" or '
4301                '"--load-balancer-outbound-ports" or '
4302                '"--load-balancer-idle-timeout" or '
4303                '"--auto-upgrade-channel" or '
4304                '"--attach-acr" or "--detach-acr" or '
4305                '"--uptime-sla" or '
4306                '"--no-uptime-sla" or '
4307                '"--api-server-authorized-ip-ranges" or '
4308                '"--enable-aad" or '
4309                '"--aad-tenant-id" or '
4310                '"--aad-admin-group-object-ids" or '
4311                '"--enable-ahub" or '
4312                '"--disable-ahub" or '
4313                '"--windows-admin-password" or '
4314                '"--enable-managed-identity" or '
4315                '"--assign-identity" or '
4316                '"--enable-azure-rbac" or '
4317                '"--disable-azure-rbac" or '
4318                '"--enable-public-fqdn" or '
4319                '"--disable-public-fqdn" or '
4320                '"--tags"'
4321            )
4322
4323    def fetch_mc(self) -> ManagedCluster:
4324        """Get the ManagedCluster object currently in use and attach it to internal context.
4325
4326        Internally send request using ContainerServiceClient and parameters name (cluster) and resource group name.
4327
4328        :return: the ManagedCluster object
4329        """
4330        mc = self.client.get(self.context.get_resource_group_name(), self.context.get_name())
4331
4332        # attach mc to AKSContext
4333        self.context.attach_mc(mc)
4334        return mc
4335
4336    def update_tags(self, mc: ManagedCluster) -> ManagedCluster:
4337        """Update tags for the ManagedCluster object.
4338
4339        :return: the ManagedCluster object
4340        """
4341        if not isinstance(mc, self.models.ManagedCluster):
4342            raise CLIInternalError(
4343                "Unexpected mc object with type '{}'.".format(type(mc))
4344            )
4345
4346        tags = self.context.get_tags()
4347        if tags is not None:
4348            mc.tags = tags
4349        return mc
4350
4351    def update_auto_scaler_profile(self, mc):
4352        """Update autoscaler profile for the ManagedCluster object.
4353
4354        :return: the ManagedCluster object
4355        """
4356        if not isinstance(mc, self.models.ManagedCluster):
4357            raise CLIInternalError(
4358                "Unexpected mc object with type '{}'.".format(type(mc))
4359            )
4360
4361        (
4362            update_cluster_autoscaler,
4363            enable_cluster_autoscaler,
4364            disable_cluster_autoscaler,
4365            min_count,
4366            max_count,
4367        ) = (
4368            self.context.get_update_enable_disable_cluster_autoscaler_and_min_max_count()
4369        )
4370
4371        if update_cluster_autoscaler or enable_cluster_autoscaler:
4372            mc.agent_pool_profiles[0].enable_auto_scaling = True
4373            mc.agent_pool_profiles[0].min_count = int(min_count)
4374            mc.agent_pool_profiles[0].max_count = int(max_count)
4375
4376        if disable_cluster_autoscaler:
4377            mc.agent_pool_profiles[0].enable_auto_scaling = False
4378            mc.agent_pool_profiles[0].min_count = None
4379            mc.agent_pool_profiles[0].max_count = None
4380
4381        cluster_autoscaler_profile = self.context.get_cluster_autoscaler_profile()
4382        if cluster_autoscaler_profile is not None:
4383            # update profile (may clear profile with empty dictionary)
4384            mc.auto_scaler_profile = cluster_autoscaler_profile
4385        return mc
4386
4387    def process_attach_detach_acr(self, mc: ManagedCluster) -> None:
4388        """Attach or detach acr for the cluster.
4389
4390        The function "_ensure_aks_acr" will be called to create or delete an AcrPull role assignment for the acr, which
4391        internally used AuthorizationManagementClient to send the request.
4392
4393        :return: None
4394        """
4395        if not isinstance(mc, self.models.ManagedCluster):
4396            raise CLIInternalError(
4397                "Unexpected mc object with type '{}'.".format(type(mc))
4398            )
4399
4400        subscription_id = self.context.get_subscription_id()
4401        client_id = self.context.get_client_id_from_identity_or_sp_profile()
4402        attach_acr = self.context.get_attach_acr()
4403        detach_acr = self.context.get_detach_acr()
4404
4405        if attach_acr:
4406            _ensure_aks_acr(self.cmd,
4407                            client_id=client_id,
4408                            acr_name_or_id=attach_acr,
4409                            subscription_id=subscription_id)
4410
4411        if detach_acr:
4412            _ensure_aks_acr(self.cmd,
4413                            client_id=client_id,
4414                            acr_name_or_id=detach_acr,
4415                            subscription_id=subscription_id,
4416                            detach=True)
4417
4418    def update_sku(self, mc: ManagedCluster) -> ManagedCluster:
4419        """Update sku (uptime sla) for the ManagedCluster object.
4420
4421        :return: the ManagedCluster object
4422        """
4423        if not isinstance(mc, self.models.ManagedCluster):
4424            raise CLIInternalError(
4425                "Unexpected mc object with type '{}'.".format(type(mc))
4426            )
4427
4428        if self.context.get_uptime_sla():
4429            mc.sku = self.models.ManagedClusterSKU(
4430                name="Basic",
4431                tier="Paid"
4432            )
4433
4434        if self.context.get_no_uptime_sla():
4435            mc.sku = self.models.ManagedClusterSKU(
4436                name="Basic",
4437                tier="Free"
4438            )
4439        return mc
4440
4441    def update_default_mc_profile(self) -> ManagedCluster:
4442        """The overall controller used to update the default ManagedCluster profile.
4443
4444        Note: To reduce the risk of regression introduced by refactoring, this function is not complete and is being
4445        implemented gradually.
4446
4447        The completely updated ManagedCluster object will later be passed as a parameter to the underlying SDK
4448        (mgmt-containerservice) to send the actual request.
4449
4450        :return: the ManagedCluster object
4451        """
4452        # check raw parameters
4453        self.check_raw_parameters()
4454        # fetch the ManagedCluster object
4455        mc = self.fetch_mc()
4456        # update auto scaler profile
4457        mc = self.update_auto_scaler_profile(mc)
4458        # attach or detach acr (add or delete role assignment for acr)
4459        self.process_attach_detach_acr(mc)
4460        # update sku (uptime sla)
4461        mc = self.update_sku(mc)
4462
4463        return mc
4464
4465    def update_mc(self) -> ManagedCluster:
4466        """Send request to update the existing managed cluster.
4467
4468        Note: To reduce the risk of regression introduced by refactoring, this function is not complete and is being
4469        implemented gradually.
4470
4471        The function "_put_managed_cluster_ensuring_permission" will be called to use the ContainerServiceClient to
4472        send a reqeust to update the existing managed cluster, and also add necessary role assignments for some optional
4473        components.
4474
4475        :return: the ManagedCluster object
4476        """
4477