1
2from __future__ import print_function
3
4import os
5import re
6import struct
7from . import packagequery
8from osc.util.helper import decode_it
9
10def cmp(a, b):
11    return (a > b) - (a < b)
12
13class RpmError(packagequery.PackageError):
14    pass
15
16class RpmHeaderError(RpmError):
17    pass
18
19class RpmHeader:
20    """corresponds more or less to the indexEntry_s struct"""
21    def __init__(self, offset, length):
22        self.offset = offset
23        # length of the data section (without length of indexEntries)
24        self.length = length
25        self.entries = []
26
27    def append(self, entry):
28        self.entries.append(entry)
29
30    def gettag(self, tag):
31        for i in self.entries:
32            if i.tag == tag:
33                return i
34        return None
35
36    def __iter__(self):
37        for i in self.entries:
38            yield i
39
40    def __len__(self):
41        return len(self.entries)
42
43class RpmHeaderEntry:
44    """corresponds to the entryInfo_s struct (except the data attribute)"""
45
46    # each element represents an int
47    ENTRY_SIZE = 16
48    def __init__(self, tag, type, offset, count):
49        self.tag = tag
50        self.type = type
51        self.offset = offset
52        self.count = count
53        self.data = None
54
55class RpmQuery(packagequery.PackageQuery, packagequery.PackageQueryResult):
56    LEAD_SIZE = 96
57    LEAD_MAGIC = 0xedabeedb
58    HEADER_MAGIC = 0x8eade801
59    HEADERSIG_TYPE = 5
60
61    LESS = 1 << 1
62    GREATER = 1 << 2
63    EQUAL = 1 << 3
64
65    SENSE_STRONG = 1 << 27
66
67    default_tags = (1000, 1001, 1002, 1003, 1004, 1022, 1005, 1020,
68        1047, 1112, 1113, # provides
69        1049, 1048, 1050, # requires
70        1054, 1053, 1055, # conflicts
71        1090, 1114, 1115, # obsoletes
72        1156, 1158, 1157, # oldsuggests
73        5046, 5047, 5048, # recommends
74        5049, 5051, 5050, # suggests
75        5052, 5053, 5054, # supplements
76        5055, 5056, 5057  # enhances
77    )
78
79    def __init__(self, fh):
80        self.__file = fh
81        self.__path = os.path.abspath(fh.name)
82        self.filename_suffix = 'rpm'
83        self.header = None
84
85    def read(self, all_tags=False, self_provides=True, *extra_tags, **extra_kw):
86        # self_provides is unused because a rpm always has a self provides
87        self.__read_lead()
88        data = self.__file.read(RpmHeaderEntry.ENTRY_SIZE)
89        hdrmgc, reserved, il, dl = struct.unpack('!I3i', data)
90        if self.HEADER_MAGIC != hdrmgc:
91            raise RpmHeaderError(self.__path, 'invalid headermagic \'%s\'' % hdrmgc)
92        # skip signature header for now
93        size = il * RpmHeaderEntry.ENTRY_SIZE + dl
94        # data is 8 byte aligned
95        pad = (size + 7) & ~7
96        querysig = extra_kw.get('querysig')
97        if not querysig:
98            self.__file.read(pad)
99            data = self.__file.read(RpmHeaderEntry.ENTRY_SIZE)
100        hdrmgc, reserved, il, dl = struct.unpack('!I3i', data)
101        self.header = RpmHeader(pad, dl)
102        if self.HEADER_MAGIC != hdrmgc:
103            raise RpmHeaderError(self.__path, 'invalid headermagic \'%s\'' % hdrmgc)
104        data = self.__file.read(il * RpmHeaderEntry.ENTRY_SIZE)
105        while len(data) > 0:
106            ei = struct.unpack('!4i', data[:RpmHeaderEntry.ENTRY_SIZE])
107            self.header.append(RpmHeaderEntry(*ei))
108            data = data[RpmHeaderEntry.ENTRY_SIZE:]
109        data = self.__file.read(self.header.length)
110        for i in self.header:
111            if i.tag in self.default_tags + extra_tags or all_tags:
112                try: # this may fail for -debug* packages
113                    self.__read_data(i, data)
114                except: pass
115        return self
116
117    def __read_lead(self):
118        data = self.__file.read(self.LEAD_SIZE)
119        leadmgc, = struct.unpack('!I', data[:4])
120        if leadmgc != self.LEAD_MAGIC:
121            raise RpmError(self.__path, 'not a rpm (invalid lead magic \'%s\')' % leadmgc)
122        sigtype, = struct.unpack('!h', data[78:80])
123        if sigtype != self.HEADERSIG_TYPE:
124            raise RpmError(self.__path, 'invalid header signature \'%s\'' % sigtype)
125
126    def __read_data(self, entry, data):
127        off = entry.offset
128        if entry.type == 2:
129            entry.data = struct.unpack('!%dc' % entry.count, data[off:off + 1 * entry.count])
130        if entry.type == 3:
131            entry.data = struct.unpack('!%dh' % entry.count, data[off:off + 2 * entry.count])
132        elif entry.type == 4:
133            entry.data = struct.unpack('!%di' % entry.count, data[off:off + 4 * entry.count])
134        elif entry.type == 6:
135            entry.data = unpack_string(data[off:])
136        elif entry.type == 7:
137            entry.data = data[off:off + entry.count]
138        elif entry.type == 8 or entry.type == 9:
139            cnt = entry.count
140            entry.data = []
141            while cnt > 0:
142                cnt -= 1
143                s = unpack_string(data[off:])
144                # also skip '\0'
145                off += len(s) + 1
146                entry.data.append(s)
147            if entry.type == 8:
148                return
149            lang = os.getenv('LANGUAGE') or os.getenv('LC_ALL') \
150                or os.getenv('LC_MESSAGES') or os.getenv('LANG')
151            if lang is None:
152                entry.data = entry.data[0]
153                return
154            # get private i18n table
155            table = self.header.gettag(100)
156            # just care about the country code
157            lang = lang.split('_', 1)[0]
158            cnt = 0
159            for i in table.data:
160                if cnt > len(entry.data) - 1:
161                    break
162                if i == lang:
163                    entry.data = entry.data[cnt]
164                    return
165                cnt += 1
166            entry.data = entry.data[0]
167        else:
168            raise RpmHeaderError(self.__path, 'unsupported tag type \'%d\' (tag: \'%s\'' % (entry.type, entry.tag))
169
170    def __reqprov(self, tag, flags, version, strong=None):
171        pnames = self.header.gettag(tag)
172        if not pnames:
173            return []
174        pnames = pnames.data
175        pflags = self.header.gettag(flags).data
176        pvers = self.header.gettag(version).data
177        if not (pnames and pflags and pvers):
178            raise RpmError(self.__path, 'cannot get provides/requires, tags are missing')
179        res = []
180        for name, flags, ver in zip(pnames, pflags, pvers):
181            if strong is not None:
182                # compat code for the obsolete RPMTAG_OLDSUGGESTSNAME tag
183                # strong == 1 => return only "recommends"
184                # strong == 0 => return only "suggests"
185                if strong == 1:
186                    strong = self.SENSE_STRONG
187                if (flags & self.SENSE_STRONG) != strong:
188                    continue
189            # RPMSENSE_SENSEMASK = 15 (see rpmlib.h) but ignore RPMSENSE_SERIAL (= 1 << 0) therefore use 14
190            if flags & 14:
191                name += b' '
192                if flags & self.GREATER:
193                    name += b'>'
194                elif flags & self.LESS:
195                    name += b'<'
196                if flags & self.EQUAL:
197                    name += b'='
198                name += b' %s' % ver
199            res.append(name)
200        return res
201
202    def vercmp(self, rpmq):
203        res = RpmQuery.rpmvercmp(str(self.epoch()), str(rpmq.epoch()))
204        if res != 0:
205            return res
206        res = RpmQuery.rpmvercmp(self.version(), rpmq.version())
207        if res != 0:
208            return res
209        res = RpmQuery.rpmvercmp(self.release(), rpmq.release())
210        return res
211
212    # XXX: create dict for the tag => number mapping?!
213    def name(self):
214        return self.header.gettag(1000).data
215
216    def version(self):
217        return self.header.gettag(1001).data
218
219    def release(self):
220        return self.header.gettag(1002).data
221
222    def epoch(self):
223        epoch = self.header.gettag(1003)
224        if epoch is None:
225            return 0
226        return epoch.data[0]
227
228    def arch(self):
229        return self.header.gettag(1022).data
230
231    def summary(self):
232        return self.header.gettag(1004).data
233
234    def description(self):
235        return self.header.gettag(1005).data
236
237    def url(self):
238        entry = self.header.gettag(1020)
239        if entry is None:
240            return None
241        return entry.data
242
243    def path(self):
244        return self.__path
245
246    def provides(self):
247        return self.__reqprov(1047, 1112, 1113)
248
249    def requires(self):
250        return self.__reqprov(1049, 1048, 1050)
251
252    def conflicts(self):
253        return self.__reqprov(1054, 1053, 1055)
254
255    def obsoletes(self):
256        return self.__reqprov(1090, 1114, 1115)
257
258    def recommends(self):
259        recommends = self.__reqprov(5046, 5048, 5047)
260        if not recommends:
261            recommends = self.__reqprov(1156, 1158, 1157, 1)
262        return recommends
263
264    def suggests(self):
265        suggests = self.__reqprov(5049, 5051, 5050)
266        if not suggests:
267            suggests = self.__reqprov(1156, 1158, 1157, 0)
268        return suggests
269
270    def supplements(self):
271        return self.__reqprov(5052, 5054, 5053)
272
273    def enhances(self):
274        return self.__reqprov(5055, 5057, 5506)
275
276    def is_src(self):
277        # SOURCERPM = 1044
278        return self.gettag(1044) is None
279
280    def is_nosrc(self):
281        # NOSOURCE = 1051, NOPATCH = 1052
282        return self.is_src() and \
283            (self.gettag(1051) is not None or self.gettag(1052) is not None)
284
285    def gettag(self, num):
286        return self.header.gettag(num)
287
288    def canonname(self):
289        if self.is_nosrc():
290            arch = b'nosrc'
291        elif self.is_src():
292            arch = b'src'
293        else:
294            arch = self.arch()
295        return RpmQuery.filename(self.name(), None, self.version(), self.release(), arch)
296
297    @staticmethod
298    def query(filename):
299        f = open(filename, 'rb')
300        rpmq = RpmQuery(f)
301        rpmq.read()
302        f.close()
303        return rpmq
304
305    @staticmethod
306    def queryhdrmd5(filename):
307        f = open(filename, 'rb')
308        rpmq = RpmQuery(f)
309        rpmq.read(1004, querysig=True)
310        f.close()
311        entry = rpmq.gettag(1004)
312        if entry is None:
313            return None
314        return ''.join([ "%02x" % x for x in struct.unpack('16B', entry.data) ])
315
316    @staticmethod
317    def rpmvercmp(ver1, ver2):
318        """
319        implementation of RPM's version comparison algorithm
320        (as described in lib/rpmvercmp.c)
321        """
322        if ver1 == ver2:
323            return 0
324        res = 0
325        ver1 = decode_it(ver1)
326        ver2 = decode_it(ver2)
327        while res == 0:
328            # remove all leading non alphanumeric or tilde chars
329            ver1 = re.sub('^[^a-zA-Z0-9~]*', '', ver1)
330            ver2 = re.sub('^[^a-zA-Z0-9~]*', '', ver2)
331            if ver1.startswith('~') or ver2.startswith('~'):
332                if not ver1.startswith('~'):
333                    return 1
334                elif not ver2.startswith('~'):
335                    return -1
336                ver1 = ver1[1:]
337                ver2 = ver2[1:]
338                continue
339
340            if not (len(ver1) and len(ver2)):
341                break
342
343            # check if we have a digits segment
344            mo1 = re.match('(\d+)', ver1)
345            mo2 = re.match('(\d+)', ver2)
346            numeric = True
347            if mo1 is None:
348                mo1 = re.match('([a-zA-Z]+)', ver1)
349                mo2 = re.match('([a-zA-Z]+)', ver2)
350                numeric = False
351            # check for different types: alpha and numeric
352            if mo2 is None:
353                if numeric:
354                    return 1
355                return -1
356            seg1 = mo1.group(0)
357            ver1 = ver1[mo1.end(0):]
358            seg2 = mo2.group(1)
359            ver2 = ver2[mo2.end(1):]
360            if numeric:
361                # remove leading zeros
362                seg1 = re.sub('^0+', '', seg1)
363                seg2 = re.sub('^0+', '', seg2)
364                # longer digit segment wins - if both have the same length
365                # a simple ascii compare decides
366                res = len(seg1) - len(seg2) or cmp(seg1, seg2)
367            else:
368                res = cmp(seg1, seg2)
369        if res > 0:
370            return 1
371        elif res < 0:
372            return -1
373        return cmp(ver1, ver2)
374
375    @staticmethod
376    def filename(name, epoch, version, release, arch):
377        return b'%s-%s-%s.%s.rpm' % (name, version, release, arch)
378
379def unpack_string(data, encoding=None):
380    """unpack a '\\0' terminated string from data"""
381    idx = data.find(b'\0')
382    if idx == -1:
383        raise ValueError('illegal string: not \\0 terminated')
384    data = data[:idx]
385    if encoding is not None:
386        data = data.decode(encoding)
387    return data
388
389if __name__ == '__main__':
390    import sys
391    try:
392        rpmq = RpmQuery.query(sys.argv[1])
393    except RpmError as e:
394        print(e.msg)
395        sys.exit(2)
396    print(rpmq.name(), rpmq.version(), rpmq.release(), rpmq.arch(), rpmq.url())
397    print(rpmq.summary())
398    print(rpmq.description())
399    print('##########')
400    print('\n'.join(rpmq.provides()))
401    print('##########')
402    print('\n'.join(rpmq.requires()))
403    print('##########')
404    print(RpmQuery.queryhdrmd5(sys.argv[1]))
405