1# misc.py 2# Copyright (C) 2012-2016 Red Hat, Inc. 3# 4# This copyrighted material is made available to anyone wishing to use, 5# modify, copy, or redistribute it subject to the terms and conditions of 6# the GNU General Public License v.2, or (at your option) any later version. 7# This program is distributed in the hope that it will be useful, but WITHOUT 8# ANY WARRANTY expressed or implied, including the implied warranties of 9# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General 10# Public License for more details. You should have received a copy of the 11# GNU General Public License along with this program; if not, write to the 12# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 13# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the 14# source code or documentation are not subject to the GNU General Public 15# License and may only be used or replicated with the express permission of 16# Red Hat, Inc. 17# 18 19""" 20Assorted utility functions for yum. 21""" 22 23from __future__ import print_function, absolute_import 24from __future__ import unicode_literals 25from dnf.pycomp import base64_decodebytes, basestring, unicode 26from stat import * 27import libdnf.utils 28import dnf.const 29import dnf.crypto 30import dnf.exceptions 31import dnf.i18n 32import errno 33import glob 34import io 35import os 36import os.path 37import pwd 38import re 39import shutil 40import tempfile 41 42_default_checksums = ['sha256'] 43 44 45_re_compiled_glob_match = None 46def re_glob(s): 47 """ Tests if a string is a shell wildcard. """ 48 global _re_compiled_glob_match 49 if _re_compiled_glob_match is None: 50 _re_compiled_glob_match = re.compile(r'[*?]|\[.+\]').search 51 return _re_compiled_glob_match(s) 52 53_re_compiled_full_match = None 54def re_full_search_needed(s): 55 """ Tests if a string needs a full nevra match, instead of just name. """ 56 global _re_compiled_full_match 57 if _re_compiled_full_match is None: 58 # A glob, or a "." or "-" separator, followed by something (the ".") 59 one = re.compile(r'.*([-.*?]|\[.+\]).').match 60 # Any epoch, for envra 61 two = re.compile('[0-9]+:').match 62 _re_compiled_full_match = (one, two) 63 for rec in _re_compiled_full_match: 64 if rec(s): 65 return True 66 return False 67 68def get_default_chksum_type(): 69 return _default_checksums[0] 70 71class GenericHolder(object): 72 """Generic Holder class used to hold other objects of known types 73 It exists purely to be able to do object.somestuff, object.someotherstuff 74 or object[key] and pass object to another function that will 75 understand it""" 76 77 def __init__(self, iter=None): 78 self.__iter = iter 79 80 def __iter__(self): 81 if self.__iter is not None: 82 return iter(self[self.__iter]) 83 84 def __getitem__(self, item): 85 if hasattr(self, item): 86 return getattr(self, item) 87 else: 88 raise KeyError(item) 89 90 def all_lists(self): 91 """Return a dictionary of all lists.""" 92 return {key: list_ for key, list_ in vars(self).items() 93 if type(list_) is list} 94 95 def merge_lists(self, other): 96 """ Concatenate the list attributes from 'other' to ours. """ 97 for (key, val) in other.all_lists().items(): 98 vars(self).setdefault(key, []).extend(val) 99 return self 100 101def procgpgkey(rawkey): 102 '''Convert ASCII-armored GPG key to binary 103 ''' 104 105 # Normalise newlines 106 rawkey = re.sub(b'\r\n?', b'\n', rawkey) 107 108 # Extract block 109 block = io.BytesIO() 110 inblock = 0 111 pastheaders = 0 112 for line in rawkey.split(b'\n'): 113 if line.startswith(b'-----BEGIN PGP PUBLIC KEY BLOCK-----'): 114 inblock = 1 115 elif inblock and line.strip() == b'': 116 pastheaders = 1 117 elif inblock and line.startswith(b'-----END PGP PUBLIC KEY BLOCK-----'): 118 # Hit the end of the block, get out 119 break 120 elif pastheaders and line.startswith(b'='): 121 # Hit the CRC line, don't include this and stop 122 break 123 elif pastheaders: 124 block.write(line + b'\n') 125 126 # Decode and return 127 return base64_decodebytes(block.getvalue()) 128 129 130def keyInstalled(ts, keyid, timestamp): 131 ''' 132 Return if the GPG key described by the given keyid and timestamp are 133 installed in the rpmdb. 134 135 The keyid and timestamp should both be passed as integers. 136 The ts is an rpm transaction set object 137 138 Return values: 139 - -1 key is not installed 140 - 0 key with matching ID and timestamp is installed 141 - 1 key with matching ID is installed but has an older timestamp 142 - 2 key with matching ID is installed but has a newer timestamp 143 144 No effort is made to handle duplicates. The first matching keyid is used to 145 calculate the return result. 146 ''' 147 # Search 148 for hdr in ts.dbMatch('name', 'gpg-pubkey'): 149 if hdr['version'] == keyid: 150 installedts = int(hdr['release'], 16) 151 if installedts == timestamp: 152 return 0 153 elif installedts < timestamp: 154 return 1 155 else: 156 return 2 157 158 return -1 159 160 161def import_key_to_pubring(rawkey, keyid, gpgdir=None, make_ro_copy=True): 162 if not os.path.exists(gpgdir): 163 os.makedirs(gpgdir) 164 165 with dnf.crypto.pubring_dir(gpgdir), dnf.crypto.Context() as ctx: 166 # import the key 167 with open(os.path.join(gpgdir, 'gpg.conf'), 'wb') as fp: 168 fp.write(b'') 169 ctx.op_import(rawkey) 170 171 if make_ro_copy: 172 173 rodir = gpgdir + '-ro' 174 if not os.path.exists(rodir): 175 os.makedirs(rodir, mode=0o755) 176 for f in glob.glob(gpgdir + '/*'): 177 basename = os.path.basename(f) 178 ro_f = rodir + '/' + basename 179 shutil.copy(f, ro_f) 180 os.chmod(ro_f, 0o755) 181 # yes it is this stupid, why do you ask? 182 opts = """lock-never 183 no-auto-check-trustdb 184 trust-model direct 185 no-expensive-trust-checks 186 no-permission-warning 187 preserve-permissions 188 """ 189 with open(os.path.join(rodir, 'gpg.conf'), 'w', 0o755) as fp: 190 fp.write(opts) 191 192 193 return True 194 195 196def getCacheDir(): 197 """return a path to a valid and safe cachedir - only used when not running 198 as root or when --tempcache is set""" 199 200 uid = os.geteuid() 201 try: 202 usertup = pwd.getpwuid(uid) 203 username = dnf.i18n.ucd(usertup[0]) 204 prefix = '%s-%s-' % (dnf.const.PREFIX, username) 205 except KeyError: 206 prefix = '%s-%s-' % (dnf.const.PREFIX, uid) 207 208 # check for /var/tmp/prefix-* - 209 dirpath = '%s/%s*' % (dnf.const.TMPDIR, prefix) 210 cachedirs = sorted(glob.glob(dirpath)) 211 for thisdir in cachedirs: 212 stats = os.lstat(thisdir) 213 if S_ISDIR(stats[0]) and S_IMODE(stats[0]) == 448 and stats[4] == uid: 214 return thisdir 215 216 # make the dir (tempfile.mkdtemp()) 217 cachedir = tempfile.mkdtemp(prefix=prefix, dir=dnf.const.TMPDIR) 218 return cachedir 219 220def seq_max_split(seq, max_entries): 221 """ Given a seq, split into a list of lists of length max_entries each. """ 222 ret = [] 223 num = len(seq) 224 seq = list(seq) # Trying to use a set/etc. here is bad 225 beg = 0 226 while num > max_entries: 227 end = beg + max_entries 228 ret.append(seq[beg:end]) 229 beg += max_entries 230 num -= max_entries 231 ret.append(seq[beg:]) 232 return ret 233 234def unlink_f(filename): 235 """ Call os.unlink, but don't die if the file isn't there. This is the main 236 difference between "rm -f" and plain "rm". """ 237 try: 238 os.unlink(filename) 239 except OSError as e: 240 if e.errno != errno.ENOENT: 241 raise 242 243def stat_f(filename, ignore_EACCES=False): 244 """ Call os.stat(), but don't die if the file isn't there. Returns None. """ 245 try: 246 return os.stat(filename) 247 except OSError as e: 248 if e.errno in (errno.ENOENT, errno.ENOTDIR): 249 return None 250 if ignore_EACCES and e.errno == errno.EACCES: 251 return None 252 raise 253 254def _getloginuid(): 255 """ Get the audit-uid/login-uid, if available. os.getuid() is returned 256 instead if there was a problem. Note that no caching is done here. """ 257 # We might normally call audit.audit_getloginuid(), except that requires 258 # importing all of the audit module. And it doesn't work anyway: BZ 518721 259 try: 260 with open("/proc/self/loginuid") as fo: 261 data = fo.read() 262 return int(data) 263 except (IOError, ValueError): 264 return os.getuid() 265 266_cached_getloginuid = None 267def getloginuid(): 268 """ Get the audit-uid/login-uid, if available. os.getuid() is returned 269 instead if there was a problem. The value is cached, so you don't 270 have to save it. """ 271 global _cached_getloginuid 272 if _cached_getloginuid is None: 273 _cached_getloginuid = _getloginuid() 274 return _cached_getloginuid 275 276 277def decompress(filename, dest=None, check_timestamps=False): 278 """take a filename and decompress it into the same relative location. 279 When the compression type is not recognized (or file is not compressed), 280 the content of the file is copied to the destination""" 281 282 if dest: 283 out = dest 284 else: 285 out = None 286 dot_pos = filename.rfind('.') 287 if dot_pos > 0: 288 ext = filename[dot_pos:] 289 if ext in ('.zck', '.xz', '.bz2', '.gz', '.lzma', '.zst'): 290 out = filename[:dot_pos] 291 if out is None: 292 raise dnf.exceptions.MiscError("Could not determine destination filename") 293 294 if check_timestamps: 295 fi = stat_f(filename) 296 fo = stat_f(out) 297 if fi and fo and fo.st_mtime == fi.st_mtime: 298 return out 299 300 try: 301 # libdnf.utils.decompress either decompress file to the destination or 302 # copy the content if the compression type is not recognized 303 libdnf.utils.decompress(filename, out, 0o644) 304 except RuntimeError as e: 305 raise dnf.exceptions.MiscError(str(e)) 306 307 if check_timestamps and fi: 308 os.utime(out, (fi.st_mtime, fi.st_mtime)) 309 310 return out 311 312def calculate_repo_gen_dest(filename, generated_name): 313 dest = os.path.dirname(filename) 314 dest += '/gen' 315 if not os.path.exists(dest): 316 os.makedirs(dest, mode=0o755) 317 return dest + '/' + generated_name 318 319 320def repo_gen_decompress(filename, generated_name): 321 """ This is a wrapper around decompress, where we work out a cached 322 generated name, and use check_timestamps. filename _must_ be from 323 a repo. and generated_name is the type of the file. """ 324 325 dest = calculate_repo_gen_dest(filename, generated_name) 326 return decompress(filename, dest=dest, check_timestamps=True) 327 328def read_in_items_from_dot_dir(thisglob, line_as_list=True): 329 """ Takes a glob of a dir (like /etc/foo.d/\\*.foo) returns a list of all 330 the lines in all the files matching that glob, ignores comments and blank 331 lines, optional paramater 'line_as_list tells whether to treat each line 332 as a space or comma-separated list, defaults to True. 333 """ 334 results = [] 335 for fname in glob.glob(thisglob): 336 with open(fname) as f: 337 for line in f: 338 if re.match(r'\s*(#|$)', line): 339 continue 340 line = line.rstrip() # no more trailing \n's 341 line = line.lstrip() # be nice 342 if not line: 343 continue 344 if line_as_list: 345 line = line.replace('\n', ' ') 346 line = line.replace(',', ' ') 347 results.extend(line.split()) 348 continue 349 results.append(line) 350 return results 351