1 2from __future__ import print_function 3 4import os 5import re 6import struct 7from . import packagequery 8from osc.util.helper import decode_it 9 10def cmp(a, b): 11 return (a > b) - (a < b) 12 13class RpmError(packagequery.PackageError): 14 pass 15 16class RpmHeaderError(RpmError): 17 pass 18 19class RpmHeader: 20 """corresponds more or less to the indexEntry_s struct""" 21 def __init__(self, offset, length): 22 self.offset = offset 23 # length of the data section (without length of indexEntries) 24 self.length = length 25 self.entries = [] 26 27 def append(self, entry): 28 self.entries.append(entry) 29 30 def gettag(self, tag): 31 for i in self.entries: 32 if i.tag == tag: 33 return i 34 return None 35 36 def __iter__(self): 37 for i in self.entries: 38 yield i 39 40 def __len__(self): 41 return len(self.entries) 42 43class RpmHeaderEntry: 44 """corresponds to the entryInfo_s struct (except the data attribute)""" 45 46 # each element represents an int 47 ENTRY_SIZE = 16 48 def __init__(self, tag, type, offset, count): 49 self.tag = tag 50 self.type = type 51 self.offset = offset 52 self.count = count 53 self.data = None 54 55class RpmQuery(packagequery.PackageQuery, packagequery.PackageQueryResult): 56 LEAD_SIZE = 96 57 LEAD_MAGIC = 0xedabeedb 58 HEADER_MAGIC = 0x8eade801 59 HEADERSIG_TYPE = 5 60 61 LESS = 1 << 1 62 GREATER = 1 << 2 63 EQUAL = 1 << 3 64 65 SENSE_STRONG = 1 << 27 66 67 default_tags = (1000, 1001, 1002, 1003, 1004, 1022, 1005, 1020, 68 1047, 1112, 1113, # provides 69 1049, 1048, 1050, # requires 70 1054, 1053, 1055, # conflicts 71 1090, 1114, 1115, # obsoletes 72 1156, 1158, 1157, # oldsuggests 73 5046, 5047, 5048, # recommends 74 5049, 5051, 5050, # suggests 75 5052, 5053, 5054, # supplements 76 5055, 5056, 5057 # enhances 77 ) 78 79 def __init__(self, fh): 80 self.__file = fh 81 self.__path = os.path.abspath(fh.name) 82 self.filename_suffix = 'rpm' 83 self.header = None 84 85 def read(self, all_tags=False, self_provides=True, *extra_tags, **extra_kw): 86 # self_provides is unused because a rpm always has a self provides 87 self.__read_lead() 88 data = self.__file.read(RpmHeaderEntry.ENTRY_SIZE) 89 hdrmgc, reserved, il, dl = struct.unpack('!I3i', data) 90 if self.HEADER_MAGIC != hdrmgc: 91 raise RpmHeaderError(self.__path, 'invalid headermagic \'%s\'' % hdrmgc) 92 # skip signature header for now 93 size = il * RpmHeaderEntry.ENTRY_SIZE + dl 94 # data is 8 byte aligned 95 pad = (size + 7) & ~7 96 querysig = extra_kw.get('querysig') 97 if not querysig: 98 self.__file.read(pad) 99 data = self.__file.read(RpmHeaderEntry.ENTRY_SIZE) 100 hdrmgc, reserved, il, dl = struct.unpack('!I3i', data) 101 self.header = RpmHeader(pad, dl) 102 if self.HEADER_MAGIC != hdrmgc: 103 raise RpmHeaderError(self.__path, 'invalid headermagic \'%s\'' % hdrmgc) 104 data = self.__file.read(il * RpmHeaderEntry.ENTRY_SIZE) 105 while len(data) > 0: 106 ei = struct.unpack('!4i', data[:RpmHeaderEntry.ENTRY_SIZE]) 107 self.header.append(RpmHeaderEntry(*ei)) 108 data = data[RpmHeaderEntry.ENTRY_SIZE:] 109 data = self.__file.read(self.header.length) 110 for i in self.header: 111 if i.tag in self.default_tags + extra_tags or all_tags: 112 try: # this may fail for -debug* packages 113 self.__read_data(i, data) 114 except: pass 115 return self 116 117 def __read_lead(self): 118 data = self.__file.read(self.LEAD_SIZE) 119 leadmgc, = struct.unpack('!I', data[:4]) 120 if leadmgc != self.LEAD_MAGIC: 121 raise RpmError(self.__path, 'not a rpm (invalid lead magic \'%s\')' % leadmgc) 122 sigtype, = struct.unpack('!h', data[78:80]) 123 if sigtype != self.HEADERSIG_TYPE: 124 raise RpmError(self.__path, 'invalid header signature \'%s\'' % sigtype) 125 126 def __read_data(self, entry, data): 127 off = entry.offset 128 if entry.type == 2: 129 entry.data = struct.unpack('!%dc' % entry.count, data[off:off + 1 * entry.count]) 130 if entry.type == 3: 131 entry.data = struct.unpack('!%dh' % entry.count, data[off:off + 2 * entry.count]) 132 elif entry.type == 4: 133 entry.data = struct.unpack('!%di' % entry.count, data[off:off + 4 * entry.count]) 134 elif entry.type == 6: 135 entry.data = unpack_string(data[off:]) 136 elif entry.type == 7: 137 entry.data = data[off:off + entry.count] 138 elif entry.type == 8 or entry.type == 9: 139 cnt = entry.count 140 entry.data = [] 141 while cnt > 0: 142 cnt -= 1 143 s = unpack_string(data[off:]) 144 # also skip '\0' 145 off += len(s) + 1 146 entry.data.append(s) 147 if entry.type == 8: 148 return 149 lang = os.getenv('LANGUAGE') or os.getenv('LC_ALL') \ 150 or os.getenv('LC_MESSAGES') or os.getenv('LANG') 151 if lang is None: 152 entry.data = entry.data[0] 153 return 154 # get private i18n table 155 table = self.header.gettag(100) 156 # just care about the country code 157 lang = lang.split('_', 1)[0] 158 cnt = 0 159 for i in table.data: 160 if cnt > len(entry.data) - 1: 161 break 162 if i == lang: 163 entry.data = entry.data[cnt] 164 return 165 cnt += 1 166 entry.data = entry.data[0] 167 else: 168 raise RpmHeaderError(self.__path, 'unsupported tag type \'%d\' (tag: \'%s\'' % (entry.type, entry.tag)) 169 170 def __reqprov(self, tag, flags, version, strong=None): 171 pnames = self.header.gettag(tag) 172 if not pnames: 173 return [] 174 pnames = pnames.data 175 pflags = self.header.gettag(flags).data 176 pvers = self.header.gettag(version).data 177 if not (pnames and pflags and pvers): 178 raise RpmError(self.__path, 'cannot get provides/requires, tags are missing') 179 res = [] 180 for name, flags, ver in zip(pnames, pflags, pvers): 181 if strong is not None: 182 # compat code for the obsolete RPMTAG_OLDSUGGESTSNAME tag 183 # strong == 1 => return only "recommends" 184 # strong == 0 => return only "suggests" 185 if strong == 1: 186 strong = self.SENSE_STRONG 187 if (flags & self.SENSE_STRONG) != strong: 188 continue 189 # RPMSENSE_SENSEMASK = 15 (see rpmlib.h) but ignore RPMSENSE_SERIAL (= 1 << 0) therefore use 14 190 if flags & 14: 191 name += b' ' 192 if flags & self.GREATER: 193 name += b'>' 194 elif flags & self.LESS: 195 name += b'<' 196 if flags & self.EQUAL: 197 name += b'=' 198 name += b' %s' % ver 199 res.append(name) 200 return res 201 202 def vercmp(self, rpmq): 203 res = RpmQuery.rpmvercmp(str(self.epoch()), str(rpmq.epoch())) 204 if res != 0: 205 return res 206 res = RpmQuery.rpmvercmp(self.version(), rpmq.version()) 207 if res != 0: 208 return res 209 res = RpmQuery.rpmvercmp(self.release(), rpmq.release()) 210 return res 211 212 # XXX: create dict for the tag => number mapping?! 213 def name(self): 214 return self.header.gettag(1000).data 215 216 def version(self): 217 return self.header.gettag(1001).data 218 219 def release(self): 220 return self.header.gettag(1002).data 221 222 def epoch(self): 223 epoch = self.header.gettag(1003) 224 if epoch is None: 225 return 0 226 return epoch.data[0] 227 228 def arch(self): 229 return self.header.gettag(1022).data 230 231 def summary(self): 232 return self.header.gettag(1004).data 233 234 def description(self): 235 return self.header.gettag(1005).data 236 237 def url(self): 238 entry = self.header.gettag(1020) 239 if entry is None: 240 return None 241 return entry.data 242 243 def path(self): 244 return self.__path 245 246 def provides(self): 247 return self.__reqprov(1047, 1112, 1113) 248 249 def requires(self): 250 return self.__reqprov(1049, 1048, 1050) 251 252 def conflicts(self): 253 return self.__reqprov(1054, 1053, 1055) 254 255 def obsoletes(self): 256 return self.__reqprov(1090, 1114, 1115) 257 258 def recommends(self): 259 recommends = self.__reqprov(5046, 5048, 5047) 260 if not recommends: 261 recommends = self.__reqprov(1156, 1158, 1157, 1) 262 return recommends 263 264 def suggests(self): 265 suggests = self.__reqprov(5049, 5051, 5050) 266 if not suggests: 267 suggests = self.__reqprov(1156, 1158, 1157, 0) 268 return suggests 269 270 def supplements(self): 271 return self.__reqprov(5052, 5054, 5053) 272 273 def enhances(self): 274 return self.__reqprov(5055, 5057, 5506) 275 276 def is_src(self): 277 # SOURCERPM = 1044 278 return self.gettag(1044) is None 279 280 def is_nosrc(self): 281 # NOSOURCE = 1051, NOPATCH = 1052 282 return self.is_src() and \ 283 (self.gettag(1051) is not None or self.gettag(1052) is not None) 284 285 def gettag(self, num): 286 return self.header.gettag(num) 287 288 def canonname(self): 289 if self.is_nosrc(): 290 arch = b'nosrc' 291 elif self.is_src(): 292 arch = b'src' 293 else: 294 arch = self.arch() 295 return RpmQuery.filename(self.name(), None, self.version(), self.release(), arch) 296 297 @staticmethod 298 def query(filename): 299 f = open(filename, 'rb') 300 rpmq = RpmQuery(f) 301 rpmq.read() 302 f.close() 303 return rpmq 304 305 @staticmethod 306 def queryhdrmd5(filename): 307 f = open(filename, 'rb') 308 rpmq = RpmQuery(f) 309 rpmq.read(1004, querysig=True) 310 f.close() 311 entry = rpmq.gettag(1004) 312 if entry is None: 313 return None 314 return ''.join([ "%02x" % x for x in struct.unpack('16B', entry.data) ]) 315 316 @staticmethod 317 def rpmvercmp(ver1, ver2): 318 """ 319 implementation of RPM's version comparison algorithm 320 (as described in lib/rpmvercmp.c) 321 """ 322 if ver1 == ver2: 323 return 0 324 res = 0 325 ver1 = decode_it(ver1) 326 ver2 = decode_it(ver2) 327 while res == 0: 328 # remove all leading non alphanumeric or tilde chars 329 ver1 = re.sub('^[^a-zA-Z0-9~]*', '', ver1) 330 ver2 = re.sub('^[^a-zA-Z0-9~]*', '', ver2) 331 if ver1.startswith('~') or ver2.startswith('~'): 332 if not ver1.startswith('~'): 333 return 1 334 elif not ver2.startswith('~'): 335 return -1 336 ver1 = ver1[1:] 337 ver2 = ver2[1:] 338 continue 339 340 if not (len(ver1) and len(ver2)): 341 break 342 343 # check if we have a digits segment 344 mo1 = re.match('(\d+)', ver1) 345 mo2 = re.match('(\d+)', ver2) 346 numeric = True 347 if mo1 is None: 348 mo1 = re.match('([a-zA-Z]+)', ver1) 349 mo2 = re.match('([a-zA-Z]+)', ver2) 350 numeric = False 351 # check for different types: alpha and numeric 352 if mo2 is None: 353 if numeric: 354 return 1 355 return -1 356 seg1 = mo1.group(0) 357 ver1 = ver1[mo1.end(0):] 358 seg2 = mo2.group(1) 359 ver2 = ver2[mo2.end(1):] 360 if numeric: 361 # remove leading zeros 362 seg1 = re.sub('^0+', '', seg1) 363 seg2 = re.sub('^0+', '', seg2) 364 # longer digit segment wins - if both have the same length 365 # a simple ascii compare decides 366 res = len(seg1) - len(seg2) or cmp(seg1, seg2) 367 else: 368 res = cmp(seg1, seg2) 369 if res > 0: 370 return 1 371 elif res < 0: 372 return -1 373 return cmp(ver1, ver2) 374 375 @staticmethod 376 def filename(name, epoch, version, release, arch): 377 return b'%s-%s-%s.%s.rpm' % (name, version, release, arch) 378 379def unpack_string(data, encoding=None): 380 """unpack a '\\0' terminated string from data""" 381 idx = data.find(b'\0') 382 if idx == -1: 383 raise ValueError('illegal string: not \\0 terminated') 384 data = data[:idx] 385 if encoding is not None: 386 data = data.decode(encoding) 387 return data 388 389if __name__ == '__main__': 390 import sys 391 try: 392 rpmq = RpmQuery.query(sys.argv[1]) 393 except RpmError as e: 394 print(e.msg) 395 sys.exit(2) 396 print(rpmq.name(), rpmq.version(), rpmq.release(), rpmq.arch(), rpmq.url()) 397 print(rpmq.summary()) 398 print(rpmq.description()) 399 print('##########') 400 print('\n'.join(rpmq.provides())) 401 print('##########') 402 print('\n'.join(rpmq.requires())) 403 print('##########') 404 print(RpmQuery.queryhdrmd5(sys.argv[1])) 405