1# misc.py
2# Copyright (C) 2012-2016 Red Hat, Inc.
3#
4# This copyrighted material is made available to anyone wishing to use,
5# modify, copy, or redistribute it subject to the terms and conditions of
6# the GNU General Public License v.2, or (at your option) any later version.
7# This program is distributed in the hope that it will be useful, but WITHOUT
8# ANY WARRANTY expressed or implied, including the implied warranties of
9# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10# Public License for more details.  You should have received a copy of the
11# GNU General Public License along with this program; if not, write to the
12# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14# source code or documentation are not subject to the GNU General Public
15# License and may only be used or replicated with the express permission of
16# Red Hat, Inc.
17#
18
19"""
20Assorted utility functions for yum.
21"""
22
23from __future__ import print_function, absolute_import
24from __future__ import unicode_literals
25from dnf.pycomp import base64_decodebytes, basestring, unicode
26from stat import *
27import libdnf.utils
28import dnf.const
29import dnf.crypto
30import dnf.exceptions
31import dnf.i18n
32import errno
33import glob
34import io
35import os
36import os.path
37import pwd
38import re
39import shutil
40import tempfile
41
42_default_checksums = ['sha256']
43
44
45_re_compiled_glob_match = None
46def re_glob(s):
47    """ Tests if a string is a shell wildcard. """
48    global _re_compiled_glob_match
49    if _re_compiled_glob_match is None:
50        _re_compiled_glob_match = re.compile(r'[*?]|\[.+\]').search
51    return _re_compiled_glob_match(s)
52
53_re_compiled_full_match = None
54def re_full_search_needed(s):
55    """ Tests if a string needs a full nevra match, instead of just name. """
56    global _re_compiled_full_match
57    if _re_compiled_full_match is None:
58        # A glob, or a "." or "-" separator, followed by something (the ".")
59        one = re.compile(r'.*([-.*?]|\[.+\]).').match
60        # Any epoch, for envra
61        two = re.compile('[0-9]+:').match
62        _re_compiled_full_match = (one, two)
63    for rec in _re_compiled_full_match:
64        if rec(s):
65            return True
66    return False
67
68def get_default_chksum_type():
69    return _default_checksums[0]
70
71class GenericHolder(object):
72    """Generic Holder class used to hold other objects of known types
73       It exists purely to be able to do object.somestuff, object.someotherstuff
74       or object[key] and pass object to another function that will
75       understand it"""
76
77    def __init__(self, iter=None):
78        self.__iter = iter
79
80    def __iter__(self):
81        if self.__iter is not None:
82            return iter(self[self.__iter])
83
84    def __getitem__(self, item):
85        if hasattr(self, item):
86            return getattr(self, item)
87        else:
88            raise KeyError(item)
89
90    def all_lists(self):
91        """Return a dictionary of all lists."""
92        return {key: list_ for key, list_ in vars(self).items()
93                if type(list_) is list}
94
95    def merge_lists(self, other):
96        """ Concatenate the list attributes from 'other' to ours. """
97        for (key, val) in other.all_lists().items():
98            vars(self).setdefault(key, []).extend(val)
99        return self
100
101def procgpgkey(rawkey):
102    '''Convert ASCII-armored GPG key to binary
103    '''
104
105    # Normalise newlines
106    rawkey = re.sub(b'\r\n?', b'\n', rawkey)
107
108    # Extract block
109    block = io.BytesIO()
110    inblock = 0
111    pastheaders = 0
112    for line in rawkey.split(b'\n'):
113        if line.startswith(b'-----BEGIN PGP PUBLIC KEY BLOCK-----'):
114            inblock = 1
115        elif inblock and line.strip() == b'':
116            pastheaders = 1
117        elif inblock and line.startswith(b'-----END PGP PUBLIC KEY BLOCK-----'):
118            # Hit the end of the block, get out
119            break
120        elif pastheaders and line.startswith(b'='):
121            # Hit the CRC line, don't include this and stop
122            break
123        elif pastheaders:
124            block.write(line + b'\n')
125
126    # Decode and return
127    return base64_decodebytes(block.getvalue())
128
129
130def keyInstalled(ts, keyid, timestamp):
131    '''
132    Return if the GPG key described by the given keyid and timestamp are
133    installed in the rpmdb.
134
135    The keyid and timestamp should both be passed as integers.
136    The ts is an rpm transaction set object
137
138    Return values:
139        - -1      key is not installed
140        - 0       key with matching ID and timestamp is installed
141        - 1       key with matching ID is installed but has an older timestamp
142        - 2       key with matching ID is installed but has a newer timestamp
143
144    No effort is made to handle duplicates. The first matching keyid is used to
145    calculate the return result.
146    '''
147    # Search
148    for hdr in ts.dbMatch('name', 'gpg-pubkey'):
149        if hdr['version'] == keyid:
150            installedts = int(hdr['release'], 16)
151            if installedts == timestamp:
152                return 0
153            elif installedts < timestamp:
154                return 1
155            else:
156                return 2
157
158    return -1
159
160
161def import_key_to_pubring(rawkey, keyid, gpgdir=None, make_ro_copy=True):
162    if not os.path.exists(gpgdir):
163        os.makedirs(gpgdir)
164
165    with dnf.crypto.pubring_dir(gpgdir), dnf.crypto.Context() as ctx:
166        # import the key
167        with open(os.path.join(gpgdir, 'gpg.conf'), 'wb') as fp:
168            fp.write(b'')
169        ctx.op_import(rawkey)
170
171        if make_ro_copy:
172
173            rodir = gpgdir + '-ro'
174            if not os.path.exists(rodir):
175                os.makedirs(rodir, mode=0o755)
176                for f in glob.glob(gpgdir + '/*'):
177                    basename = os.path.basename(f)
178                    ro_f = rodir + '/' + basename
179                    shutil.copy(f, ro_f)
180                    os.chmod(ro_f, 0o755)
181                # yes it is this stupid, why do you ask?
182                opts = """lock-never
183    no-auto-check-trustdb
184    trust-model direct
185    no-expensive-trust-checks
186    no-permission-warning
187    preserve-permissions
188    """
189                with open(os.path.join(rodir, 'gpg.conf'), 'w', 0o755) as fp:
190                    fp.write(opts)
191
192
193        return True
194
195
196def getCacheDir():
197    """return a path to a valid and safe cachedir - only used when not running
198       as root or when --tempcache is set"""
199
200    uid = os.geteuid()
201    try:
202        usertup = pwd.getpwuid(uid)
203        username = dnf.i18n.ucd(usertup[0])
204        prefix = '%s-%s-' % (dnf.const.PREFIX, username)
205    except KeyError:
206        prefix = '%s-%s-' % (dnf.const.PREFIX, uid)
207
208    # check for /var/tmp/prefix-* -
209    dirpath = '%s/%s*' % (dnf.const.TMPDIR, prefix)
210    cachedirs = sorted(glob.glob(dirpath))
211    for thisdir in cachedirs:
212        stats = os.lstat(thisdir)
213        if S_ISDIR(stats[0]) and S_IMODE(stats[0]) == 448 and stats[4] == uid:
214            return thisdir
215
216    # make the dir (tempfile.mkdtemp())
217    cachedir = tempfile.mkdtemp(prefix=prefix, dir=dnf.const.TMPDIR)
218    return cachedir
219
220def seq_max_split(seq, max_entries):
221    """ Given a seq, split into a list of lists of length max_entries each. """
222    ret = []
223    num = len(seq)
224    seq = list(seq) # Trying to use a set/etc. here is bad
225    beg = 0
226    while num > max_entries:
227        end = beg + max_entries
228        ret.append(seq[beg:end])
229        beg += max_entries
230        num -= max_entries
231    ret.append(seq[beg:])
232    return ret
233
234def unlink_f(filename):
235    """ Call os.unlink, but don't die if the file isn't there. This is the main
236        difference between "rm -f" and plain "rm". """
237    try:
238        os.unlink(filename)
239    except OSError as e:
240        if e.errno != errno.ENOENT:
241            raise
242
243def stat_f(filename, ignore_EACCES=False):
244    """ Call os.stat(), but don't die if the file isn't there. Returns None. """
245    try:
246        return os.stat(filename)
247    except OSError as e:
248        if e.errno in (errno.ENOENT, errno.ENOTDIR):
249            return None
250        if ignore_EACCES and e.errno == errno.EACCES:
251            return None
252        raise
253
254def _getloginuid():
255    """ Get the audit-uid/login-uid, if available. os.getuid() is returned
256        instead if there was a problem. Note that no caching is done here. """
257    #  We might normally call audit.audit_getloginuid(), except that requires
258    # importing all of the audit module. And it doesn't work anyway: BZ 518721
259    try:
260        with open("/proc/self/loginuid") as fo:
261            data = fo.read()
262            return int(data)
263    except (IOError, ValueError):
264        return os.getuid()
265
266_cached_getloginuid = None
267def getloginuid():
268    """ Get the audit-uid/login-uid, if available. os.getuid() is returned
269        instead if there was a problem. The value is cached, so you don't
270        have to save it. """
271    global _cached_getloginuid
272    if _cached_getloginuid is None:
273        _cached_getloginuid = _getloginuid()
274    return _cached_getloginuid
275
276
277def decompress(filename, dest=None, check_timestamps=False):
278    """take a filename and decompress it into the same relative location.
279       When the compression type is not recognized (or file is not compressed),
280       the content of the file is copied to the destination"""
281
282    if dest:
283        out = dest
284    else:
285        out = None
286        dot_pos = filename.rfind('.')
287        if dot_pos > 0:
288            ext = filename[dot_pos:]
289            if ext in ('.zck', '.xz', '.bz2', '.gz', '.lzma', '.zst'):
290                out = filename[:dot_pos]
291        if out is None:
292            raise dnf.exceptions.MiscError("Could not determine destination filename")
293
294    if check_timestamps:
295        fi = stat_f(filename)
296        fo = stat_f(out)
297        if fi and fo and fo.st_mtime == fi.st_mtime:
298            return out
299
300    try:
301        # libdnf.utils.decompress either decompress file to the destination or
302        # copy the content if the compression type is not recognized
303        libdnf.utils.decompress(filename, out, 0o644)
304    except RuntimeError as e:
305        raise dnf.exceptions.MiscError(str(e))
306
307    if check_timestamps and fi:
308        os.utime(out, (fi.st_mtime, fi.st_mtime))
309
310    return out
311
312def calculate_repo_gen_dest(filename, generated_name):
313    dest = os.path.dirname(filename)
314    dest += '/gen'
315    if not os.path.exists(dest):
316        os.makedirs(dest, mode=0o755)
317    return dest + '/' + generated_name
318
319
320def repo_gen_decompress(filename, generated_name):
321    """ This is a wrapper around decompress, where we work out a cached
322        generated name, and use check_timestamps. filename _must_ be from
323        a repo. and generated_name is the type of the file. """
324
325    dest = calculate_repo_gen_dest(filename, generated_name)
326    return decompress(filename, dest=dest, check_timestamps=True)
327
328def read_in_items_from_dot_dir(thisglob, line_as_list=True):
329    """ Takes a glob of a dir (like /etc/foo.d/\\*.foo) returns a list of all
330       the lines in all the files matching that glob, ignores comments and blank
331       lines, optional paramater 'line_as_list tells whether to treat each line
332       as a space or comma-separated list, defaults to True.
333    """
334    results = []
335    for fname in glob.glob(thisglob):
336        with open(fname) as f:
337            for line in f:
338                if re.match(r'\s*(#|$)', line):
339                    continue
340                line = line.rstrip() # no more trailing \n's
341                line = line.lstrip() # be nice
342                if not line:
343                    continue
344                if line_as_list:
345                    line = line.replace('\n', ' ')
346                    line = line.replace(',', ' ')
347                    results.extend(line.split())
348                    continue
349                results.append(line)
350    return results
351