1# --------------------------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for license information.
4# --------------------------------------------------------------------------------------------
5
6import os
7from knack.util import CLIError
8from knack.log import get_logger
9
10logger = get_logger(__name__)
11
12
13def validate_headers(namespace):
14    """Extracts multiple space-separated headers in key[=value] format. """
15    if isinstance(namespace.headers, list):
16        headers_dict = {}
17        for item in namespace.headers:
18            headers_dict.update(validate_header(item))
19        namespace.headers = headers_dict
20
21
22def validate_header(string):
23    """Extracts a single header in key[=value] format. """
24    result = {}
25    if string:
26        comps = string.split('=', 1)
27        result = {comps[0]: comps[1]} if len(comps) > 1 else {string: ''}
28    return result
29
30
31def validate_arg(namespace):
32    if isinstance(namespace.arg, list):
33        arguments_list = []
34        for item in namespace.arg:
35            arguments_list.append(validate_task_argument(item, False))
36        namespace.arg = arguments_list
37
38
39def validate_secret_arg(namespace):
40    if isinstance(namespace.secret_arg, list):
41        secret_arguments_list = []
42        for item in namespace.secret_arg:
43            secret_arguments_list.append(validate_task_argument(item, True))
44        namespace.secret_arg = secret_arguments_list
45
46
47def validate_set(namespace):
48    if isinstance(namespace.set_value, list):
49        set_list = []
50        for item in namespace.set_value:
51            set_list.append(validate_task_value(item, False))
52        namespace.set_value = set_list
53
54
55def validate_set_secret(namespace):
56    if isinstance(namespace.set_secret, list):
57        set_secret_list = []
58        for item in namespace.set_secret:
59            set_secret_list.append(validate_task_value(item, True))
60        namespace.set_secret = set_secret_list
61
62
63def validate_task_value(string, is_secret):
64    """Extracts a single SetValue in key[=value] format. """
65    if string:
66        comps = string.split('=', 1)
67        if len(comps) > 1:
68            return {'type': 'SetValue', 'name': comps[0], 'value': comps[1], 'isSecret': is_secret}
69        return {'type': 'SetValue', 'name': comps[0], 'value': '', 'isSecret': is_secret}
70    return None
71
72
73def validate_task_argument(string, is_secret):
74    """Extracts a single argument in key[=value] format. """
75    if string:
76        comps = string.split('=', 1)
77        if len(comps) > 1:
78            return {'type': 'Argument', 'name': comps[0], 'value': comps[1], 'isSecret': is_secret}
79        # If no value, check if the argument exists as an environment variable
80        local_value = os.environ.get(comps[0])
81        if local_value is not None:
82            return {'type': 'Argument', 'name': comps[0], 'value': local_value, 'isSecret': is_secret}
83        return {'type': 'Argument', 'name': comps[0], 'value': '', 'isSecret': is_secret}
84    return None
85
86
87def validate_retention_days(namespace):
88    days = namespace.days
89    if days and (days < 0 or days > 365):
90        raise CLIError("Invalid value for days: should be from 0 to 365")
91
92
93def validate_registry_name(cmd, namespace):
94    """Omit login server endpoint suffix."""
95    registry = namespace.registry_name
96    suffixes = cmd.cli_ctx.cloud.suffixes
97    # Some clouds do not define 'acr_login_server_endpoint' (e.g. AzureGermanCloud)
98    if registry and hasattr(suffixes, 'acr_login_server_endpoint'):
99        acr_suffix = suffixes.acr_login_server_endpoint
100        pos = registry.find(acr_suffix)
101        if pos > 0:
102            logger.warning("The login server endpoint suffix '%s' is automatically omitted.", acr_suffix)
103            namespace.registry_name = registry[:pos]
104
105
106def validate_expiration_time(namespace):
107    import datetime
108    DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
109    if namespace.expiration:
110        try:
111            namespace.expiration = datetime.datetime.strptime(namespace.expiration, DATE_TIME_FORMAT)
112        except ValueError:
113            raise CLIError("Input '{}' is not valid datetime. Valid example: 2025-12-31T12:59:59Z".format(
114                namespace.expiration))
115