1import sys 2import rpm 3from rpm._rpm import ts as TransactionSetCore 4 5# TODO: migrate relevant documentation from C-side 6class TransactionSet(TransactionSetCore): 7 _probFilter = 0 8 9 def _wrapSetGet(self, attr, val): 10 oval = getattr(self, attr) 11 setattr(self, attr, val) 12 return oval 13 14 def setVSFlags(self, flags): 15 return self._wrapSetGet('_vsflags', flags) 16 17 def getVSFlags(self): 18 return self._vsflags 19 20 def setVfyFlags(self, flags): 21 return self._wrapSetGet('_vfyflags', flags) 22 23 def getVfyFlags(self): 24 return self._vfyflags 25 26 def getVfyLevel(self): 27 return self._vfylevel 28 29 def setVfyLevel(self, flags): 30 return self._wrapSetGet('_vfylevel', flags) 31 32 def setColor(self, color): 33 return self._wrapSetGet('_color', color) 34 35 def setPrefColor(self, color): 36 return self._wrapSetGet('_prefcolor', color) 37 38 def setFlags(self, flags): 39 return self._wrapSetGet('_flags', flags) 40 41 def setProbFilter(self, ignoreSet): 42 return self._wrapSetGet('_probFilter', ignoreSet) 43 44 def parseSpec(self, specfile): 45 return rpm.spec(specfile) 46 47 def getKeys(self): 48 keys = [] 49 for te in self: 50 keys.append(te.Key()) 51 # Backwards compatibility goo - WTH does this return a *tuple* ?! 52 if not keys: 53 return None 54 else: 55 return tuple(keys) 56 57 def _f2hdr(self, item): 58 if isinstance(item, str): 59 with open(item) as f: 60 header = self.hdrFromFdno(f) 61 elif isinstance(item, rpm.hdr): 62 header = item 63 else: 64 header = self.hdrFromFdno(item) 65 return header 66 67 def addInstall(self, item, key, how="u"): 68 header = self._f2hdr(item) 69 70 if how not in ['u', 'i']: 71 raise ValueError('how argument must be "u" or "i"') 72 upgrade = (how == "u") 73 74 if not TransactionSetCore.addInstall(self, header, key, upgrade): 75 if upgrade: 76 raise rpm.error("adding upgrade to transaction failed") 77 else: 78 raise rpm.error("adding install to transaction failed") 79 80 def addReinstall(self, item, key): 81 header = self._f2hdr(item) 82 83 if not TransactionSetCore.addReinstall(self, header, key): 84 raise rpm.error("adding reinstall to transaction failed") 85 86 def addErase(self, item): 87 hdrs = [] 88 # match iterators are passed on as-is 89 if isinstance(item, rpm.mi): 90 hdrs = item 91 elif isinstance(item, rpm.hdr): 92 hdrs.append(item) 93 elif isinstance(item, (int, str)): 94 if isinstance(item, int): 95 dbi = rpm.RPMDBI_PACKAGES 96 else: 97 dbi = rpm.RPMDBI_LABEL 98 99 for h in self.dbMatch(dbi, item): 100 hdrs.append(h) 101 102 if not hdrs: 103 raise rpm.error("package not installed") 104 else: 105 raise TypeError("invalid type %s" % type(item)) 106 107 for h in hdrs: 108 if not TransactionSetCore.addErase(self, h): 109 raise rpm.error("adding erasure to transaction failed") 110 111 def run(self, callback, data): 112 rc = TransactionSetCore.run(self, callback, data, self._probFilter) 113 114 # crazy backwards compatibility goo: None for ok, list of problems 115 # if transaction didn't complete and empty list if it completed 116 # with errors 117 if rc == 0: 118 return None 119 120 res = [] 121 if rc > 0: 122 for prob in self.problems(): 123 item = ("%s" % prob, (prob.type, prob._str, prob._num)) 124 res.append(item) 125 return res 126 127 def check(self, *args, **kwds): 128 TransactionSetCore.check(self, *args, **kwds) 129 130 # compatibility: munge problem strings into dependency tuples of doom 131 res = [] 132 for p in self.problems(): 133 # is it anything we need to care about? 134 if p.type == rpm.RPMPROB_CONFLICT: 135 sense = rpm.RPMDEP_SENSE_CONFLICTS 136 elif p.type == rpm.RPMPROB_REQUIRES: 137 sense = rpm.RPMDEP_SENSE_REQUIRES 138 else: 139 continue 140 141 # strip arch, split to name, version, release 142 nevr = p.altNEVR.rsplit('.', 1)[0] 143 n, v, r = nevr.rsplit('-', 2) 144 145 # extract the dependency information 146 needs = p._str.split() 147 needname = needs[0] 148 needflags = rpm.RPMSENSE_ANY 149 if len(needs) == 3: 150 needop = needs[1] 151 if '<' in needop: 152 needflags |= rpm.RPMSENSE_LESS 153 if '=' in needop: 154 needflags |= rpm.RPMSENSE_EQUAL 155 if '>' in needop: 156 needflags |= rpm.RPMSENSE_GREATER 157 needver = needs[2] 158 else: 159 needver = "" 160 161 res.append(((n, v, r), 162 (needname, needver), needflags, sense, p.key)) 163 164 return res 165 166 def hdrCheck(self, blob): 167 res, msg = TransactionSetCore.hdrCheck(self, blob) 168 # generate backwards compatibly broken exceptions 169 if res == rpm.RPMRC_NOKEY: 170 raise rpm.error("public key not available") 171 elif res == rpm.RPMRC_NOTTRUSTED: 172 raise rpm.error("public key not trusted") 173 elif res != rpm.RPMRC_OK: 174 raise rpm.error(msg) 175 176 def hdrFromFdno(self, fd): 177 res, h = TransactionSetCore.hdrFromFdno(self, fd) 178 # generate backwards compatibly broken exceptions 179 if res == rpm.RPMRC_NOKEY: 180 raise rpm.error("public key not available") 181 elif res == rpm.RPMRC_NOTTRUSTED: 182 raise rpm.error("public key not trusted") 183 elif res != rpm.RPMRC_OK: 184 raise rpm.error("error reading package header") 185 186 return h 187