1#!/usr/bin/env python3
2#
3# Copyright © 2017, 2020 Chris Lamb <lamby@debian.org>
4#
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or (at
8# your option) any later version.
9#
10# This program is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13# General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18import argparse
19import bz2
20import collections
21import json
22import logging
23import os
24import re
25import subprocess
26import sys
27import time
28
29import apt
30import requests
31
32try:
33    from xdg.BaseDirectory import xdg_cache_home
34except ImportError:
35    print("This script requires the xdg python3 module.", file=sys.stderr)
36    print(
37        "Please install the python3-xdg Debian package in order to use" "this utility.",
38        file=sys.stderr,
39    )
40    sys.exit(1)
41
42
43class ReproducibleCheck:
44    HELP = """
45        Reports on the reproducible status of installed packages.
46        For more details please see <https://reproducible-builds.org>.
47    """
48
49    NAME = os.path.basename(__file__)
50    VERSION = 1
51
52    STATUS_URL = (
53        "https://tests.reproducible-builds.org/debian/reproducible-tracker.json.bz2"
54    )
55
56    CACHE = os.path.join(xdg_cache_home, NAME, os.path.basename(STATUS_URL))
57    CACHE_AGE_SECONDS = 86400
58
59    @classmethod
60    def parse(cls):
61        parser = argparse.ArgumentParser(description=cls.HELP)
62
63        parser.add_argument(
64            "-d",
65            "--debug",
66            help="show debugging messages",
67            default=False,
68            action="store_true",
69        )
70
71        parser.add_argument(
72            "-r",
73            "--raw",
74            help="print unreproducible binary packages only (for dd-list -i)",
75            default=False,
76            action="store_true",
77        )
78
79        parser.add_argument(
80            "--version",
81            help="print version and exit",
82            default=False,
83            action="store_true",
84        )
85
86        return cls(parser.parse_args())
87
88    def __init__(self, args):
89        self.args = args
90
91        logging.basicConfig(
92            format="%(asctime).19s %(levelname).1s: %(message)s",
93            level=logging.DEBUG if args.debug else logging.INFO,
94        )
95
96        self.log = logging.getLogger()
97
98    def main(self):
99        if self.args.version:
100            print(f"{self.NAME} version {self.VERSION}")
101            return 0
102
103        if self.get_distributor_id() != "Debian":
104            self.log.error("Refusing to return results for non-Debian distributions")
105            return 2
106
107        self.update_cache()
108
109        installed = self.get_installed_packages()
110        reproducible = self.get_reproducible_packages()
111
112        if self.args.raw:
113            self.output_raw(installed, reproducible)
114        else:
115            self.output_by_source(installed, reproducible)
116
117        self.log.info(
118            "These results are based on data from the Reproducible Builds "
119            "CI framework, showing only the show the theoretical (and "
120            "unofficial) reproducibility of these Debian packages."
121        )
122
123        return 0
124
125    def get_distributor_id(self):
126        try:
127            distribution_id = (
128                subprocess.check_output(("lsb_release", "-is")).decode("utf-8").strip()
129            )
130        except subprocess.CalledProcessError:
131            distribution_id = ""
132
133        self.log.debug("Detected distribution %s", distribution_id or "(unknown)")
134
135        return distribution_id
136
137    def update_cache(self):
138        self.log.debug("Checking cache file %s ...", self.CACHE)
139
140        try:
141            if os.path.getmtime(self.CACHE) >= time.time() - self.CACHE_AGE_SECONDS:
142                self.log.debug("Cache is up to date")
143                return
144        except OSError:
145            pass
146
147        new_cache = f"{self.CACHE}.new"
148        self.log.info("Updating cache to %s...", new_cache)
149
150        response = requests.get(self.STATUS_URL)
151
152        os.makedirs(os.path.dirname(self.CACHE), exist_ok=True)
153
154        with open(new_cache, "wb") as f:
155            for x in response.iter_content(chunk_size=2 ** 16):
156                f.write(x)
157
158        os.rename(new_cache, self.CACHE)
159
160    def get_reproducible_packages(self):
161        """
162        Return (source, architecture, version) triplets for reproducible source
163        packages.
164        """
165
166        self.log.debug("Loading data from cache %s", self.CACHE)
167
168        data = set()
169        source_packages = set()
170
171        with bz2.open(self.CACHE) as f:
172            all_packages = json.loads(f.read().decode("utf-8"))
173
174            for x in all_packages:
175                for y in x["architecture_details"]:
176                    if y["status"] != "reproducible":
177                        continue
178
179                    data.add((x["package"], y["architecture"], x["version"]))
180
181                    source_packages.add(x["package"])
182
183        self.log.debug("Parsed data about %d source packages", len(source_packages))
184
185        return data
186
187    def get_installed_packages(self):
188        """
189        Return (binary_package, architecture, version) triplets, mapped to
190        their corresponding source package.
191        """
192
193        result = {}
194        for x in apt.Cache():
195            for y in x.versions:
196                if not y.is_installed:
197                    continue
198
199                # We may have installed a binNMU version locally so we need to
200                # strip these off when looking up against the JSON of results.
201                version = re.sub(r"\+b\d+$", "", y.version)
202
203                result[(x.shortname, y.architecture, version)] = y.source_name
204
205        self.log.debug("Parsed %d installed binary packages", len(result))
206
207        return result
208
209    def iter_installed_unreproducible(self, installed, reproducible):
210        # "Architecture: all" binary packages should pretend to the system's
211        # default architecture for lookup purposes.
212        default_architecture = apt.apt_pkg.config.find("APT::Architecture")
213        self.log.debug("Using %s as our 'Architecture: all' lookup")
214
215        for x, source in sorted(installed.items()):
216            binary, architecture, version = x
217
218            if architecture == "all":
219                architecture = default_architecture
220
221            lookup_key = (source, architecture, version)
222
223            if lookup_key not in reproducible:
224                yield binary, source, version
225
226    def output_by_source(self, installed, reproducible):
227        by_source = collections.defaultdict(set)
228
229        num_unreproducible = 0
230        for binary, source, version in self.iter_installed_unreproducible(
231                installed, reproducible
232        ):
233            by_source[(source, version)].add(binary)
234            num_unreproducible += 1
235
236        for (source, version), binaries in sorted(by_source.items()):
237            # Calculate some clarifying suffixes/prefixes
238            src = ""
239            pkgs = ""
240            if binaries != {source}:
241                src = "src:"
242                pkgs = " ({})".format(", ".join(binaries))
243
244            print(
245                f"{src}{source} ({version}){pkgs} is not reproducible "
246                f"<https://tests.reproducible-builds.org/debian/{source}>"
247            )
248
249        num_installed = len(installed)
250        num_reproducible = len(installed) - num_unreproducible
251        percent = 100.0 * num_reproducible / num_installed
252        print(
253            f"{num_unreproducible}/{num_installed} ({percent:.2f}%) of "
254            f"installed binary packages are reproducible."
255        )
256
257    def output_raw(self, installed, reproducible):
258        for binary, _, _ in self.iter_installed_unreproducible(installed, reproducible):
259            print(binary)
260
261
262if __name__ == "__main__":
263    try:
264        sys.exit(ReproducibleCheck.parse().main())
265    except (KeyboardInterrupt, BrokenPipeError):
266        sys.exit(1)
267