1#!/usr/bin/env python3 2# 3# Copyright © 2017, 2020 Chris Lamb <lamby@debian.org> 4# 5# This program is free software: you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation, either version 3 of the License, or (at 8# your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, but 11# WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13# General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18import argparse 19import bz2 20import collections 21import json 22import logging 23import os 24import re 25import subprocess 26import sys 27import time 28 29import apt 30import requests 31 32try: 33 from xdg.BaseDirectory import xdg_cache_home 34except ImportError: 35 print("This script requires the xdg python3 module.", file=sys.stderr) 36 print( 37 "Please install the python3-xdg Debian package in order to use" "this utility.", 38 file=sys.stderr, 39 ) 40 sys.exit(1) 41 42 43class ReproducibleCheck: 44 HELP = """ 45 Reports on the reproducible status of installed packages. 46 For more details please see <https://reproducible-builds.org>. 47 """ 48 49 NAME = os.path.basename(__file__) 50 VERSION = 1 51 52 STATUS_URL = ( 53 "https://tests.reproducible-builds.org/debian/reproducible-tracker.json.bz2" 54 ) 55 56 CACHE = os.path.join(xdg_cache_home, NAME, os.path.basename(STATUS_URL)) 57 CACHE_AGE_SECONDS = 86400 58 59 @classmethod 60 def parse(cls): 61 parser = argparse.ArgumentParser(description=cls.HELP) 62 63 parser.add_argument( 64 "-d", 65 "--debug", 66 help="show debugging messages", 67 default=False, 68 action="store_true", 69 ) 70 71 parser.add_argument( 72 "-r", 73 "--raw", 74 help="print unreproducible binary packages only (for dd-list -i)", 75 default=False, 76 action="store_true", 77 ) 78 79 parser.add_argument( 80 "--version", 81 help="print version and exit", 82 default=False, 83 action="store_true", 84 ) 85 86 return cls(parser.parse_args()) 87 88 def __init__(self, args): 89 self.args = args 90 91 logging.basicConfig( 92 format="%(asctime).19s %(levelname).1s: %(message)s", 93 level=logging.DEBUG if args.debug else logging.INFO, 94 ) 95 96 self.log = logging.getLogger() 97 98 def main(self): 99 if self.args.version: 100 print(f"{self.NAME} version {self.VERSION}") 101 return 0 102 103 if self.get_distributor_id() != "Debian": 104 self.log.error("Refusing to return results for non-Debian distributions") 105 return 2 106 107 self.update_cache() 108 109 installed = self.get_installed_packages() 110 reproducible = self.get_reproducible_packages() 111 112 if self.args.raw: 113 self.output_raw(installed, reproducible) 114 else: 115 self.output_by_source(installed, reproducible) 116 117 self.log.info( 118 "These results are based on data from the Reproducible Builds " 119 "CI framework, showing only the show the theoretical (and " 120 "unofficial) reproducibility of these Debian packages." 121 ) 122 123 return 0 124 125 def get_distributor_id(self): 126 try: 127 distribution_id = ( 128 subprocess.check_output(("lsb_release", "-is")).decode("utf-8").strip() 129 ) 130 except subprocess.CalledProcessError: 131 distribution_id = "" 132 133 self.log.debug("Detected distribution %s", distribution_id or "(unknown)") 134 135 return distribution_id 136 137 def update_cache(self): 138 self.log.debug("Checking cache file %s ...", self.CACHE) 139 140 try: 141 if os.path.getmtime(self.CACHE) >= time.time() - self.CACHE_AGE_SECONDS: 142 self.log.debug("Cache is up to date") 143 return 144 except OSError: 145 pass 146 147 new_cache = f"{self.CACHE}.new" 148 self.log.info("Updating cache to %s...", new_cache) 149 150 response = requests.get(self.STATUS_URL) 151 152 os.makedirs(os.path.dirname(self.CACHE), exist_ok=True) 153 154 with open(new_cache, "wb") as f: 155 for x in response.iter_content(chunk_size=2 ** 16): 156 f.write(x) 157 158 os.rename(new_cache, self.CACHE) 159 160 def get_reproducible_packages(self): 161 """ 162 Return (source, architecture, version) triplets for reproducible source 163 packages. 164 """ 165 166 self.log.debug("Loading data from cache %s", self.CACHE) 167 168 data = set() 169 source_packages = set() 170 171 with bz2.open(self.CACHE) as f: 172 all_packages = json.loads(f.read().decode("utf-8")) 173 174 for x in all_packages: 175 for y in x["architecture_details"]: 176 if y["status"] != "reproducible": 177 continue 178 179 data.add((x["package"], y["architecture"], x["version"])) 180 181 source_packages.add(x["package"]) 182 183 self.log.debug("Parsed data about %d source packages", len(source_packages)) 184 185 return data 186 187 def get_installed_packages(self): 188 """ 189 Return (binary_package, architecture, version) triplets, mapped to 190 their corresponding source package. 191 """ 192 193 result = {} 194 for x in apt.Cache(): 195 for y in x.versions: 196 if not y.is_installed: 197 continue 198 199 # We may have installed a binNMU version locally so we need to 200 # strip these off when looking up against the JSON of results. 201 version = re.sub(r"\+b\d+$", "", y.version) 202 203 result[(x.shortname, y.architecture, version)] = y.source_name 204 205 self.log.debug("Parsed %d installed binary packages", len(result)) 206 207 return result 208 209 def iter_installed_unreproducible(self, installed, reproducible): 210 # "Architecture: all" binary packages should pretend to the system's 211 # default architecture for lookup purposes. 212 default_architecture = apt.apt_pkg.config.find("APT::Architecture") 213 self.log.debug("Using %s as our 'Architecture: all' lookup") 214 215 for x, source in sorted(installed.items()): 216 binary, architecture, version = x 217 218 if architecture == "all": 219 architecture = default_architecture 220 221 lookup_key = (source, architecture, version) 222 223 if lookup_key not in reproducible: 224 yield binary, source, version 225 226 def output_by_source(self, installed, reproducible): 227 by_source = collections.defaultdict(set) 228 229 num_unreproducible = 0 230 for binary, source, version in self.iter_installed_unreproducible( 231 installed, reproducible 232 ): 233 by_source[(source, version)].add(binary) 234 num_unreproducible += 1 235 236 for (source, version), binaries in sorted(by_source.items()): 237 # Calculate some clarifying suffixes/prefixes 238 src = "" 239 pkgs = "" 240 if binaries != {source}: 241 src = "src:" 242 pkgs = " ({})".format(", ".join(binaries)) 243 244 print( 245 f"{src}{source} ({version}){pkgs} is not reproducible " 246 f"<https://tests.reproducible-builds.org/debian/{source}>" 247 ) 248 249 num_installed = len(installed) 250 num_reproducible = len(installed) - num_unreproducible 251 percent = 100.0 * num_reproducible / num_installed 252 print( 253 f"{num_unreproducible}/{num_installed} ({percent:.2f}%) of " 254 f"installed binary packages are reproducible." 255 ) 256 257 def output_raw(self, installed, reproducible): 258 for binary, _, _ in self.iter_installed_unreproducible(installed, reproducible): 259 print(binary) 260 261 262if __name__ == "__main__": 263 try: 264 sys.exit(ReproducibleCheck.parse().main()) 265 except (KeyboardInterrupt, BrokenPipeError): 266 sys.exit(1) 267