1"""
2Test library.
3"""
4
5import difflib
6import inspect
7import json
8import subprocess
9import os
10import posixpath
11import shlex
12import shutil
13import string
14import threading
15import urllib
16import pprint
17import SocketServer
18import SimpleHTTPServer
19
20
21class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
22    pass
23
24
25class FileHTTPServerRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
26    def translate_path(self, path):
27        """Translate a /-separated PATH to the local filename syntax.
28
29        Components that mean special things to the local file system
30        (e.g. drive or directory names) are ignored.  (XXX They should
31        probably be diagnosed.)
32
33        """
34        # abandon query parameters
35        path = path.split('?', 1)[0]
36        path = path.split('#', 1)[0]
37        path = posixpath.normpath(urllib.unquote(path))
38        words = path.split('/')
39        words = filter(None, words)
40        path = self.rootPath
41        for word in words:
42            drive, word = os.path.splitdrive(word)
43            head, word = os.path.split(word)
44            if word in (os.curdir, os.pardir):
45                continue
46            path = os.path.join(path, word)
47        return path
48
49    def log_message(self, format, *args):
50        pass
51
52
53class GPGFinder(object):
54    """
55    GnuPG binary discovery.
56    """
57
58    def __init__(self):
59        self.gpg1 = self.find_gpg(["gpg1", "gpg"], "gpg (GnuPG) 1.")
60        self.gpg2 = self.find_gpg(["gpg2", "gpg"], "gpg (GnuPG) 2.")
61
62        self.gpg = self.gpg1
63        if self.gpg is None:
64            self.gpg = self.gpg2
65
66        if self.gpg is None:
67            raise Exception("GnuPG binary wasn't found")
68
69    def find_gpg(self, executables, expected_version):
70        for executable in executables:
71            try:
72                output = subprocess.check_output([executable, "--version"])
73                if expected_version in output:
74                    return executable
75            except Exception:
76                pass
77
78        return None
79
80
81class BaseTest(object):
82    """
83    Base class for all tests.
84    """
85
86    longTest = False
87    fixturePool = False
88    fixturePoolCopy = False
89    fixtureDB = False
90    fixtureGpg = False
91    fixtureWebServer = False
92    requiresFTP = False
93    requiresGPG1 = False
94    requiresGPG2 = False
95
96    expectedCode = 0
97    configFile = {
98        "rootDir": "%s/.aptly" % os.environ["HOME"],
99        "downloadConcurrency": 4,
100        "downloadSpeedLimit": 0,
101        "architectures": [],
102        "dependencyFollowSuggests": False,
103        "dependencyFollowRecommends": False,
104        "dependencyFollowAllVariants": False,
105        "dependencyFollowSource": False,
106        "gpgDisableVerify": False,
107        "gpgDisableSign": False,
108        "ppaDistributorID": "ubuntu",
109        "ppaCodename": "",
110    }
111    configOverride = {}
112    environmentOverride = {}
113
114    fixtureDBDir = os.path.join(os.environ["HOME"], "aptly-fixture-db")
115    fixturePoolDir = os.path.join(os.environ["HOME"], "aptly-fixture-pool")
116    fixtureGpgKeys = ["debian-archive-keyring.gpg",
117                      "ubuntu-archive-keyring.gpg",
118                      "launchpad.key",
119                      "flat.key",
120                      "pagerduty.key",
121                      "nvidia.key",
122                      "jenkins.key"]
123
124    outputMatchPrepare = None
125
126    captureResults = False
127
128    gpgFinder = GPGFinder()
129
130    def test(self):
131        self.prepare()
132        self.run()
133        self.check()
134
135    def prepare_remove_all(self):
136        if os.path.exists(os.path.join(os.environ["HOME"], ".aptly")):
137            shutil.rmtree(os.path.join(os.environ["HOME"], ".aptly"))
138        if os.path.exists(os.path.join(os.environ["HOME"], ".aptly.conf")):
139            os.remove(os.path.join(os.environ["HOME"], ".aptly.conf"))
140        if os.path.exists(os.path.join(os.environ["HOME"], ".gnupg", "aptlytest.gpg")):
141            os.remove(os.path.join(os.environ["HOME"], ".gnupg", "aptlytest.gpg"))
142
143    def prepare_default_config(self):
144        cfg = self.configFile.copy()
145        if self.requiresGPG1:
146            cfg["gpgProvider"] = "gpg1"
147        elif self.requiresGPG2:
148            cfg["gpgProvider"] = "gpg2"
149        cfg.update(**self.configOverride)
150        f = open(os.path.join(os.environ["HOME"], ".aptly.conf"), "w")
151        f.write(json.dumps(cfg))
152        f.close()
153
154    def fixture_available(self):
155        if self.fixturePool and not os.path.exists(self.fixturePoolDir):
156            return False
157        if self.fixtureDB and not os.path.exists(self.fixtureDBDir):
158            return False
159        if self.requiresFTP and os.environ.get('NO_FTP_ACCESS', '') == 'yes':
160            return False
161        if self.requiresGPG1 and self.gpgFinder.gpg1 is None:
162            return False
163        if self.requiresGPG2 and self.gpgFinder.gpg2 is None:
164            return False
165
166        return True
167
168    def prepare_fixture(self):
169        if self.fixturePool:
170            os.makedirs(os.path.join(os.environ["HOME"], ".aptly"), 0755)
171            os.symlink(self.fixturePoolDir, os.path.join(os.environ["HOME"], ".aptly", "pool"))
172
173        if self.fixturePoolCopy:
174            os.makedirs(os.path.join(os.environ["HOME"], ".aptly"), 0755)
175            shutil.copytree(self.fixturePoolDir, os.path.join(os.environ["HOME"], ".aptly", "pool"), ignore=shutil.ignore_patterns(".git"))
176
177        if self.fixtureDB:
178            shutil.copytree(self.fixtureDBDir, os.path.join(os.environ["HOME"], ".aptly", "db"))
179
180        if self.fixtureWebServer:
181            self.webServerUrl = self.start_webserver(os.path.join(os.path.dirname(inspect.getsourcefile(self.__class__)),
182                                                     self.fixtureWebServer))
183
184        if self.requiresGPG2:
185            self.run_cmd([
186                self.gpgFinder.gpg2, "--import",
187                os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "files") + "/aptly.sec"], expected_code=None)
188
189        if self.fixtureGpg:
190            self.run_cmd([self.gpgFinder.gpg, "--no-default-keyring", "--trust-model", "always", "--batch", "--keyring", "aptlytest.gpg", "--import"] +
191                         [os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "files", key) for key in self.fixtureGpgKeys])
192
193        if hasattr(self, "fixtureCmds"):
194            for cmd in self.fixtureCmds:
195                self.run_cmd(cmd)
196
197    def run(self):
198        self.output = self.output_processor(self.run_cmd(self.runCmd, self.expectedCode))
199
200    def _start_process(self, command, stderr=subprocess.STDOUT, stdout=None):
201        if not hasattr(command, "__iter__"):
202            params = {
203                'files': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "files"),
204                'changes': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "changes"),
205                'udebs': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "udebs"),
206                'testfiles': os.path.join(os.path.dirname(inspect.getsourcefile(self.__class__)), self.__class__.__name__),
207                'aptlyroot': os.path.join(os.environ["HOME"], ".aptly"),
208            }
209            if self.fixtureWebServer:
210                params['url'] = self.webServerUrl
211
212            command = string.Template(command).substitute(params)
213
214            command = shlex.split(command)
215        environ = os.environ.copy()
216        environ["LC_ALL"] = "C"
217        environ.update(self.environmentOverride)
218        return subprocess.Popen(command, stderr=stderr, stdout=stdout, env=environ)
219
220    def run_cmd(self, command, expected_code=0):
221        try:
222            proc = self._start_process(command, stdout=subprocess.PIPE)
223            output, _ = proc.communicate()
224            if expected_code is not None:
225                if proc.returncode != expected_code:
226                    raise Exception("exit code %d != %d (output: %s)" % (proc.returncode, expected_code, output))
227            return output
228        except Exception, e:
229            raise Exception("Running command %s failed: %s" % (command, str(e)))
230
231    def gold_processor(self, gold):
232        return gold
233
234    def output_processor(self, output):
235        return output
236
237    def expand_environ(self, gold):
238        return string.Template(gold).substitute(os.environ)
239
240    def get_gold_filename(self, gold_name="gold"):
241        return os.path.join(os.path.dirname(inspect.getsourcefile(self.__class__)), self.__class__.__name__ + "_" + gold_name)
242
243    def get_gold(self, gold_name="gold"):
244        return self.gold_processor(open(self.get_gold_filename(gold_name), "r").read())
245
246    def check_output(self):
247        try:
248            self.verify_match(self.get_gold(), self.output, match_prepare=self.outputMatchPrepare)
249        except:  # noqa: E722
250            if self.captureResults:
251                if self.outputMatchPrepare is not None:
252                    self.output = self.outputMatchPrepare(self.output)
253                with open(self.get_gold_filename(), "w") as f:
254                    f.write(self.output)
255            else:
256                raise
257
258    def check_cmd_output(self, command, gold_name, match_prepare=None, expected_code=0):
259        output = self.run_cmd(command, expected_code=expected_code)
260        try:
261            self.verify_match(self.get_gold(gold_name), output, match_prepare)
262        except:  # noqa: E722
263            if self.captureResults:
264                if match_prepare is not None:
265                    output = match_prepare(output)
266                with open(self.get_gold_filename(gold_name), "w") as f:
267                    f.write(output)
268            else:
269                raise
270
271    def read_file(self, path):
272        with open(os.path.join(os.environ["HOME"], ".aptly", path), "r") as f:
273            return f.read()
274
275    def delete_file(self, path):
276        os.unlink(os.path.join(os.environ["HOME"], ".aptly", path))
277
278    def check_file_contents(self, path, gold_name, match_prepare=None):
279        contents = self.read_file(path)
280        try:
281
282            self.verify_match(self.get_gold(gold_name), contents, match_prepare=match_prepare)
283        except:  # noqa: E722
284            if self.captureResults:
285                if match_prepare is not None:
286                    contents = match_prepare(contents)
287                with open(self.get_gold_filename(gold_name), "w") as f:
288                    f.write(contents)
289            else:
290                raise
291
292    def check_file(self):
293        contents = open(self.checkedFile, "r").read()
294        try:
295            self.verify_match(self.get_gold(), contents)
296        except:  # noqa: E722
297            if self.captureResults:
298                with open(self.get_gold_filename(), "w") as f:
299                    f.write(contents)
300            else:
301                raise
302
303    def check_exists(self, path):
304        if not os.path.exists(os.path.join(os.environ["HOME"], ".aptly", path)):
305            raise Exception("path %s doesn't exist" % (path, ))
306
307    def check_not_exists(self, path):
308        if os.path.exists(os.path.join(os.environ["HOME"], ".aptly", path)):
309            raise Exception("path %s exists" % (path, ))
310
311    def check_file_not_empty(self, path):
312        if os.stat(os.path.join(os.environ["HOME"], ".aptly", path))[6] == 0:
313            raise Exception("file %s is empty" % (path, ))
314
315    def check_equal(self, a, b):
316        if a != b:
317            self.verify_match(a, b, match_prepare=pprint.pformat)
318
319    def check_ge(self, a, b):
320        if not a >= b:
321            raise Exception("%s is not greater or equal to %s" % (a, b))
322
323    def check_gt(self, a, b):
324        if not a > b:
325            raise Exception("%s is not greater to %s" % (a, b))
326
327    def check_in(self, item, l):
328        if item not in l:
329            raise Exception("item %r not in %r", item, l)
330
331    def check_subset(self, a, b):
332        diff = ''
333        for k, v in a.items():
334            if k not in b:
335                diff += "unexpected key '%s'\n" % (k,)
336            elif b[k] != v:
337                diff += "wrong value '%s' for key '%s', expected '%s'\n" % (v, k, b[k])
338        if diff:
339            raise Exception("content doesn't match:\n" + diff)
340
341    def verify_match(self, a, b, match_prepare=None):
342        if match_prepare is not None:
343            a = match_prepare(a)
344            b = match_prepare(b)
345
346        if a != b:
347            diff = "".join(difflib.unified_diff([l + "\n" for l in a.split("\n")], [l + "\n" for l in b.split("\n")]))
348
349            raise Exception("content doesn't match:\n" + diff + "\n")
350
351    check = check_output
352
353    def prepare(self):
354        self.prepare_remove_all()
355        self.prepare_default_config()
356        self.prepare_fixture()
357
358    def start_webserver(self, directory):
359        FileHTTPServerRequestHandler.rootPath = directory
360        self.webserver = ThreadedTCPServer(("localhost", 0), FileHTTPServerRequestHandler)
361
362        server_thread = threading.Thread(target=self.webserver.serve_forever)
363        server_thread.daemon = True
364        server_thread.start()
365
366        return "http://%s:%d/" % self.webserver.server_address
367
368    def shutdown(self):
369        if hasattr(self, 'webserver'):
370            self.shutdown_webserver()
371
372    def shutdown_webserver(self):
373        self.webserver.shutdown()
374
375    @classmethod
376    def shutdown_class(cls):
377        pass
378