1# aliases.py
2# Resolving aliases in CLI arguments.
3#
4# Copyright (C) 2018 Red Hat, Inc.
5#
6# This copyrighted material is made available to anyone wishing to use,
7# modify, copy, or redistribute it subject to the terms and conditions of
8# the GNU General Public License v.2, or (at your option) any later version.
9# This program is distributed in the hope that it will be useful, but WITHOUT
10# ANY WARRANTY expressed or implied, including the implied warranties of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
12# Public License for more details.  You should have received a copy of the
13# GNU General Public License along with this program; if not, write to the
14# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
16# source code or documentation are not subject to the GNU General Public
17# License and may only be used or replicated with the express permission of
18# Red Hat, Inc.
19#
20
21from __future__ import absolute_import
22from __future__ import unicode_literals
23from dnf.i18n import _
24
25import collections
26import dnf.cli
27from dnf.conf.config import PRIO_DEFAULT
28import dnf.exceptions
29import libdnf.conf
30import logging
31import os
32import os.path
33
34logger = logging.getLogger('dnf')
35
36ALIASES_DROPIN_DIR = '/etc/dnf/aliases.d/'
37ALIASES_CONF_PATH = os.path.join(ALIASES_DROPIN_DIR, 'ALIASES.conf')
38ALIASES_USER_PATH = os.path.join(ALIASES_DROPIN_DIR, 'USER.conf')
39
40
41class AliasesConfig(object):
42    def __init__(self, path):
43        self._path = path
44        self._parser = libdnf.conf.ConfigParser()
45        self._parser.read(self._path)
46
47    @property
48    def enabled(self):
49        option = libdnf.conf.OptionBool(True)
50        try:
51            option.set(PRIO_DEFAULT, self._parser.getData()["main"]["enabled"])
52        except IndexError:
53            pass
54        return option.getValue()
55
56    @property
57    def aliases(self):
58        result = collections.OrderedDict()
59        section = "aliases"
60        if not self._parser.hasSection(section):
61            return result
62        for key in self._parser.options(section):
63            value = self._parser.getValue(section, key)
64            if not value:
65                continue
66            result[key] = value.split()
67        return result
68
69
70class Aliases(object):
71    def __init__(self):
72        self.aliases = collections.OrderedDict()
73        self.conf = None
74        self.enabled = True
75
76        if self._disabled_by_environ():
77            self.enabled = False
78            return
79
80        self._load_main()
81
82        if not self.enabled:
83            return
84
85        self._load_aliases()
86
87    def _disabled_by_environ(self):
88        option = libdnf.conf.OptionBool(True)
89        try:
90            option.set(PRIO_DEFAULT, os.environ['DNF_DISABLE_ALIASES'])
91            return option.getValue()
92        except KeyError:
93            return False
94        except RuntimeError:
95            logger.warning(
96                _('Unexpected value of environment variable: '
97                  'DNF_DISABLE_ALIASES=%s'), os.environ['DNF_DISABLE_ALIASES'])
98            return True
99
100    def _load_conf(self, path):
101        try:
102            return AliasesConfig(path)
103        except RuntimeError as e:
104            raise dnf.exceptions.ConfigError(
105                _('Parsing file "%s" failed: %s') % (path, e))
106        except IOError as e:
107            raise dnf.exceptions.ConfigError(
108                _('Cannot read file "%s": %s') % (path, e))
109
110    def _load_main(self):
111        try:
112            self.conf = self._load_conf(ALIASES_CONF_PATH)
113            self.enabled = self.conf.enabled
114        except dnf.exceptions.ConfigError as e:
115            logger.debug(_('Config error: %s'), e)
116
117    def _load_aliases(self, filenames=None):
118        if filenames is None:
119            try:
120                filenames = self._dropin_dir_filenames()
121            except dnf.exceptions.ConfigError:
122                return
123        for filename in filenames:
124            try:
125                conf = self._load_conf(filename)
126                if conf.enabled:
127                    self.aliases.update(conf.aliases)
128            except dnf.exceptions.ConfigError as e:
129                logger.warning(_('Config error: %s'), e)
130
131    def _dropin_dir_filenames(self):
132        # Get default aliases config filenames:
133        #   all files from ALIASES_DROPIN_DIR,
134        #   and ALIASES_USER_PATH as the last one (-> override all others)
135        ignored_filenames = [os.path.basename(ALIASES_CONF_PATH),
136                             os.path.basename(ALIASES_USER_PATH)]
137
138        def _ignore_filename(filename):
139            return filename in ignored_filenames or\
140                filename.startswith('.') or\
141                not filename.endswith(('.conf', '.CONF'))
142
143        filenames = []
144        try:
145            if not os.path.exists(ALIASES_DROPIN_DIR):
146                os.mkdir(ALIASES_DROPIN_DIR)
147            for fn in sorted(os.listdir(ALIASES_DROPIN_DIR)):
148                if _ignore_filename(fn):
149                    continue
150                filenames.append(os.path.join(ALIASES_DROPIN_DIR, fn))
151        except (IOError, OSError) as e:
152            raise dnf.exceptions.ConfigError(e)
153        if os.path.exists(ALIASES_USER_PATH):
154            filenames.append(ALIASES_USER_PATH)
155        return filenames
156
157    def _resolve(self, args):
158        stack = []
159        self.prefix_options = []
160
161        def store_prefix(args):
162            num = 0
163            for arg in args:
164                if arg and arg[0] != '-':
165                    break
166                num += 1
167
168            self.prefix_options += args[:num]
169
170            return args[num:]
171
172        def subresolve(args):
173            suffix = store_prefix(args)
174
175            if (not suffix or  # Current alias on stack is resolved
176                    suffix[0] not in self.aliases or  # End resolving
177                    suffix[0].startswith('\\')):  # End resolving
178                try:
179                    stack.pop()
180
181                    # strip the '\' if it exists
182                    if suffix[0].startswith('\\'):
183                        suffix[0] = suffix[0][1:]
184                except IndexError:
185                    pass
186
187                return suffix
188
189            if suffix[0] in stack:  # Infinite recursion detected
190                raise dnf.exceptions.Error(
191                    _('Aliases contain infinite recursion'))
192
193            # Next word must be an alias
194            stack.append(suffix[0])
195            current_alias_result = subresolve(self.aliases[suffix[0]])
196            if current_alias_result:  # We reached non-alias or '\'
197                return current_alias_result + suffix[1:]
198            else:  # Need to resolve aliases in the rest
199                return subresolve(suffix[1:])
200
201        suffix = subresolve(args)
202        return self.prefix_options + suffix
203
204    def resolve(self, args):
205        if self.enabled:
206            try:
207                args = self._resolve(args)
208            except dnf.exceptions.Error as e:
209                logger.error(_('%s, using original arguments.'), e)
210        return args
211