1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7#     http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import logging
14import os
15import shlex
16import subprocess
17
18from botocore.configloader import raw_config_parse
19
20from awscli.compat import compat_shell_quote
21from awscli.commands import CLICommand
22from awscli.utils import emit_top_level_args_parsed_event
23
24
25LOG = logging.getLogger(__name__)
26
27
28class InvalidAliasException(Exception):
29    pass
30
31
32class AliasLoader(object):
33    def __init__(self,
34                 alias_filename=os.path.expanduser(
35                     os.path.join('~', '.aws', 'cli', 'alias'))):
36        """Interface for loading and interacting with alias file
37
38        :param alias_filename: The name of the file to load aliases from.
39            This file must be an INI file.
40        """
41        self._filename = alias_filename
42        self._aliases = None
43
44    def _build_aliases(self):
45        self._aliases = self._load_aliases()
46        self._cleanup_alias_values(self._aliases.get('toplevel', {}))
47
48    def _load_aliases(self):
49        if os.path.exists(self._filename):
50            return raw_config_parse(
51                self._filename, parse_subsections=False)
52        return {'toplevel': {}}
53
54    def _cleanup_alias_values(self, aliases):
55        for alias in aliases:
56            # Beginning and end line separators should not be included
57            # in the internal representation of the alias value.
58            aliases[alias] = aliases[alias].strip()
59
60    def get_aliases(self):
61        if self._aliases is None:
62            self._build_aliases()
63        return self._aliases.get('toplevel', {})
64
65
66class AliasCommandInjector(object):
67    def __init__(self, session, alias_loader):
68        """Injects alias commands for a command table
69
70        :type session: botocore.session.Session
71        :param session: The botocore session
72
73        :type alias_loader: awscli.alias.AliasLoader
74        :param alias_loader: The alias loader to use
75        """
76        self._session = session
77        self._alias_loader = alias_loader
78
79    def inject_aliases(self, command_table, parser):
80        for alias_name, alias_value in \
81                self._alias_loader.get_aliases().items():
82            if alias_value.startswith('!'):
83                alias_cmd = ExternalAliasCommand(alias_name, alias_value)
84            else:
85                service_alias_cmd_args = [
86                    alias_name, alias_value, self._session, command_table,
87                    parser
88                ]
89                # If the alias name matches something already in the
90                # command table provide the command it is about
91                # to clobber as a possible reference that it will
92                # need to proxy to.
93                if alias_name in command_table:
94                    service_alias_cmd_args.append(
95                        command_table[alias_name])
96                alias_cmd = ServiceAliasCommand(*service_alias_cmd_args)
97            command_table[alias_name] = alias_cmd
98
99
100class BaseAliasCommand(CLICommand):
101    _UNDOCUMENTED = True
102
103    def __init__(self, alias_name, alias_value):
104        """Base class for alias command
105
106        :type alias_name: string
107        :param alias_name: The name of the alias
108
109        :type alias_value: string
110        :param alias_value: The parsed value of the alias. This can be
111            retrieved from `AliasLoader.get_aliases()[alias_name]`
112        """
113        self._alias_name = alias_name
114        self._alias_value = alias_value
115
116    def __call__(self, args, parsed_args):
117        raise NotImplementedError('__call__')
118
119    @property
120    def name(self):
121        return self._alias_name
122
123    @name.setter
124    def name(self, value):
125        self._alias_name = value
126
127
128class ServiceAliasCommand(BaseAliasCommand):
129    UNSUPPORTED_GLOBAL_PARAMETERS = [
130        'debug',
131        'profile'
132    ]
133
134    def __init__(self, alias_name, alias_value, session, command_table,
135                 parser, shadow_proxy_command=None):
136        """Command for a `toplevel` subcommand alias
137
138        :type alias_name: string
139        :param alias_name: The name of the alias
140
141        :type alias_value: string
142        :param alias_value: The parsed value of the alias. This can be
143            retrieved from `AliasLoader.get_aliases()[alias_name]`
144
145        :type session: botocore.session.Session
146        :param session: The botocore session
147
148        :type command_table: dict
149        :param command_table: The command table containing all of the
150            possible service command objects that a particular alias could
151            redirect to.
152
153        :type parser: awscli.argparser.MainArgParser
154        :param parser: The parser to parse commands provided at the top level
155            of a CLI command which includes service commands and global
156            parameters. This is used to parse the service commmand and any
157            global parameters from the alias's value.
158
159        :type shadow_proxy_command: CLICommand
160        :param shadow_proxy_command: A built-in command that
161            potentially shadows the alias in name. If the alias
162            references this command in its value, the alias should proxy
163            to this command as oppposed to proxy to itself in the command
164            table
165        """
166        super(ServiceAliasCommand, self).__init__(alias_name, alias_value)
167        self._session = session
168        self._command_table = command_table
169        self._parser = parser
170        self._shadow_proxy_command = shadow_proxy_command
171
172    def __call__(self, args, parsed_globals):
173        alias_args = self._get_alias_args()
174        parsed_alias_args, remaining = self._parser.parse_known_args(
175            alias_args)
176        self._update_parsed_globals(parsed_alias_args, parsed_globals)
177        # Take any of the remaining arguments that were not parsed out and
178        # prepend them to the remaining args provided to the alias.
179        remaining.extend(args)
180        LOG.debug(
181            'Alias %r passing on arguments: %r to %r command',
182            self._alias_name, remaining, parsed_alias_args.command)
183        # Pass the update remaing args and global args to the service command
184        # the alias proxied to.
185        command = self._command_table[parsed_alias_args.command]
186        if self._shadow_proxy_command:
187            shadow_name = self._shadow_proxy_command.name
188            # Use the shadow command only if the aliases value
189            # uses that command indicating it needs to proxy over to
190            # a built-in command.
191            if shadow_name == parsed_alias_args.command:
192                LOG.debug(
193                    'Using shadowed command object: %s '
194                    'for alias: %s', self._shadow_proxy_command,
195                    self._alias_name
196                )
197                command = self._shadow_proxy_command
198        return command(remaining, parsed_globals)
199
200    def _get_alias_args(self):
201        try:
202            alias_args = shlex.split(self._alias_value)
203        except ValueError as e:
204            raise InvalidAliasException(
205                'Value of alias "%s" could not be parsed. '
206                'Received error: %s when parsing:\n%s' % (
207                    self._alias_name, e, self._alias_value)
208            )
209
210        alias_args = [arg.strip(os.linesep) for arg in alias_args]
211        LOG.debug(
212            'Expanded subcommand alias %r with value: %r to: %r',
213            self._alias_name, self._alias_value, alias_args
214        )
215        return alias_args
216
217    def _update_parsed_globals(self, parsed_alias_args, parsed_globals):
218        global_params_to_update = self._get_global_parameters_to_update(
219            parsed_alias_args)
220        # Emit the top level args parsed event to ensure all possible
221        # customizations that typically get applied are applied to the
222        # global parameters provided in the alias before updating
223        # the original provided global parameter values
224        # and passing those onto subsequent commands.
225        emit_top_level_args_parsed_event(self._session, parsed_alias_args)
226        for param_name in global_params_to_update:
227            updated_param_value = getattr(parsed_alias_args, param_name)
228            setattr(parsed_globals, param_name, updated_param_value)
229
230    def _get_global_parameters_to_update(self, parsed_alias_args):
231        # Retrieve a list of global parameters that the newly parsed args
232        # from the alias will have to clobber from the originally provided
233        # parsed globals.
234        global_params_to_update = []
235        for parsed_param, value in vars(parsed_alias_args).items():
236            # To determine which parameters in the alias were global values
237            # compare the parsed alias parameters to the default as
238            # specified by the parser. If the parsed values from the alias
239            # differs from the default value in the parser,
240            # that global parameter must have been provided in the alias.
241            if self._parser.get_default(parsed_param) != value:
242                if parsed_param in self.UNSUPPORTED_GLOBAL_PARAMETERS:
243                    raise InvalidAliasException(
244                        'Global parameter "--%s" detected in alias "%s" '
245                        'which is not support in subcommand aliases.' % (
246                            parsed_param, self._alias_name))
247                else:
248                    global_params_to_update.append(parsed_param)
249        return global_params_to_update
250
251
252class ExternalAliasCommand(BaseAliasCommand):
253    def __init__(self, alias_name, alias_value, invoker=subprocess.call):
254        """Command for external aliases
255
256        Executes command external of CLI as opposed to being a proxy
257        to another command.
258
259        :type alias_name: string
260        :param alias_name: The name of the alias
261
262        :type alias_value: string
263        :param alias_value: The parsed value of the alias. This can be
264            retrieved from `AliasLoader.get_aliases()[alias_name]`
265
266        :type invoker: callable
267        :param invoker: Callable to run arguments of external alias. The
268            signature should match that of ``subprocess.call``
269        """
270        self._alias_name = alias_name
271        self._alias_value = alias_value
272        self._invoker = invoker
273
274    def __call__(self, args, parsed_globals):
275        command_components = [
276            self._alias_value[1:]
277        ]
278        command_components.extend(compat_shell_quote(a) for a in args)
279        command = ' '.join(command_components)
280        LOG.debug(
281            'Using external alias %r with value: %r to run: %r',
282            self._alias_name, self._alias_value, command)
283        return self._invoker(command, shell=True)
284