1""" 2Test library. 3""" 4 5import difflib 6import inspect 7import json 8import subprocess 9import os 10import posixpath 11import shlex 12import shutil 13import string 14import threading 15import urllib 16import pprint 17import SocketServer 18import SimpleHTTPServer 19 20 21class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): 22 pass 23 24 25class FileHTTPServerRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): 26 def translate_path(self, path): 27 """Translate a /-separated PATH to the local filename syntax. 28 29 Components that mean special things to the local file system 30 (e.g. drive or directory names) are ignored. (XXX They should 31 probably be diagnosed.) 32 33 """ 34 # abandon query parameters 35 path = path.split('?', 1)[0] 36 path = path.split('#', 1)[0] 37 path = posixpath.normpath(urllib.unquote(path)) 38 words = path.split('/') 39 words = filter(None, words) 40 path = self.rootPath 41 for word in words: 42 drive, word = os.path.splitdrive(word) 43 head, word = os.path.split(word) 44 if word in (os.curdir, os.pardir): 45 continue 46 path = os.path.join(path, word) 47 return path 48 49 def log_message(self, format, *args): 50 pass 51 52 53class GPGFinder(object): 54 """ 55 GnuPG binary discovery. 56 """ 57 58 def __init__(self): 59 self.gpg1 = self.find_gpg(["gpg1", "gpg"], "gpg (GnuPG) 1.") 60 self.gpg2 = self.find_gpg(["gpg2", "gpg"], "gpg (GnuPG) 2.") 61 62 self.gpg = self.gpg1 63 if self.gpg is None: 64 self.gpg = self.gpg2 65 66 if self.gpg is None: 67 raise Exception("GnuPG binary wasn't found") 68 69 def find_gpg(self, executables, expected_version): 70 for executable in executables: 71 try: 72 output = subprocess.check_output([executable, "--version"]) 73 if expected_version in output: 74 return executable 75 except Exception: 76 pass 77 78 return None 79 80 81class BaseTest(object): 82 """ 83 Base class for all tests. 84 """ 85 86 longTest = False 87 fixturePool = False 88 fixturePoolCopy = False 89 fixtureDB = False 90 fixtureGpg = False 91 fixtureWebServer = False 92 requiresFTP = False 93 requiresGPG1 = False 94 requiresGPG2 = False 95 96 expectedCode = 0 97 configFile = { 98 "rootDir": "%s/.aptly" % os.environ["HOME"], 99 "downloadConcurrency": 4, 100 "downloadSpeedLimit": 0, 101 "architectures": [], 102 "dependencyFollowSuggests": False, 103 "dependencyFollowRecommends": False, 104 "dependencyFollowAllVariants": False, 105 "dependencyFollowSource": False, 106 "gpgDisableVerify": False, 107 "gpgDisableSign": False, 108 "ppaDistributorID": "ubuntu", 109 "ppaCodename": "", 110 } 111 configOverride = {} 112 environmentOverride = {} 113 114 fixtureDBDir = os.path.join(os.environ["HOME"], "aptly-fixture-db") 115 fixturePoolDir = os.path.join(os.environ["HOME"], "aptly-fixture-pool") 116 fixtureGpgKeys = ["debian-archive-keyring.gpg", 117 "ubuntu-archive-keyring.gpg", 118 "launchpad.key", 119 "flat.key", 120 "pagerduty.key", 121 "nvidia.key", 122 "jenkins.key"] 123 124 outputMatchPrepare = None 125 126 captureResults = False 127 128 gpgFinder = GPGFinder() 129 130 def test(self): 131 self.prepare() 132 self.run() 133 self.check() 134 135 def prepare_remove_all(self): 136 if os.path.exists(os.path.join(os.environ["HOME"], ".aptly")): 137 shutil.rmtree(os.path.join(os.environ["HOME"], ".aptly")) 138 if os.path.exists(os.path.join(os.environ["HOME"], ".aptly.conf")): 139 os.remove(os.path.join(os.environ["HOME"], ".aptly.conf")) 140 if os.path.exists(os.path.join(os.environ["HOME"], ".gnupg", "aptlytest.gpg")): 141 os.remove(os.path.join(os.environ["HOME"], ".gnupg", "aptlytest.gpg")) 142 143 def prepare_default_config(self): 144 cfg = self.configFile.copy() 145 if self.requiresGPG1: 146 cfg["gpgProvider"] = "gpg1" 147 elif self.requiresGPG2: 148 cfg["gpgProvider"] = "gpg2" 149 cfg.update(**self.configOverride) 150 f = open(os.path.join(os.environ["HOME"], ".aptly.conf"), "w") 151 f.write(json.dumps(cfg)) 152 f.close() 153 154 def fixture_available(self): 155 if self.fixturePool and not os.path.exists(self.fixturePoolDir): 156 return False 157 if self.fixtureDB and not os.path.exists(self.fixtureDBDir): 158 return False 159 if self.requiresFTP and os.environ.get('NO_FTP_ACCESS', '') == 'yes': 160 return False 161 if self.requiresGPG1 and self.gpgFinder.gpg1 is None: 162 return False 163 if self.requiresGPG2 and self.gpgFinder.gpg2 is None: 164 return False 165 166 return True 167 168 def prepare_fixture(self): 169 if self.fixturePool: 170 os.makedirs(os.path.join(os.environ["HOME"], ".aptly"), 0755) 171 os.symlink(self.fixturePoolDir, os.path.join(os.environ["HOME"], ".aptly", "pool")) 172 173 if self.fixturePoolCopy: 174 os.makedirs(os.path.join(os.environ["HOME"], ".aptly"), 0755) 175 shutil.copytree(self.fixturePoolDir, os.path.join(os.environ["HOME"], ".aptly", "pool"), ignore=shutil.ignore_patterns(".git")) 176 177 if self.fixtureDB: 178 shutil.copytree(self.fixtureDBDir, os.path.join(os.environ["HOME"], ".aptly", "db")) 179 180 if self.fixtureWebServer: 181 self.webServerUrl = self.start_webserver(os.path.join(os.path.dirname(inspect.getsourcefile(self.__class__)), 182 self.fixtureWebServer)) 183 184 if self.requiresGPG2: 185 self.run_cmd([ 186 self.gpgFinder.gpg2, "--import", 187 os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "files") + "/aptly.sec"], expected_code=None) 188 189 if self.fixtureGpg: 190 self.run_cmd([self.gpgFinder.gpg, "--no-default-keyring", "--trust-model", "always", "--batch", "--keyring", "aptlytest.gpg", "--import"] + 191 [os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "files", key) for key in self.fixtureGpgKeys]) 192 193 if hasattr(self, "fixtureCmds"): 194 for cmd in self.fixtureCmds: 195 self.run_cmd(cmd) 196 197 def run(self): 198 self.output = self.output_processor(self.run_cmd(self.runCmd, self.expectedCode)) 199 200 def _start_process(self, command, stderr=subprocess.STDOUT, stdout=None): 201 if not hasattr(command, "__iter__"): 202 params = { 203 'files': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "files"), 204 'changes': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "changes"), 205 'udebs': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "udebs"), 206 'testfiles': os.path.join(os.path.dirname(inspect.getsourcefile(self.__class__)), self.__class__.__name__), 207 'aptlyroot': os.path.join(os.environ["HOME"], ".aptly"), 208 } 209 if self.fixtureWebServer: 210 params['url'] = self.webServerUrl 211 212 command = string.Template(command).substitute(params) 213 214 command = shlex.split(command) 215 environ = os.environ.copy() 216 environ["LC_ALL"] = "C" 217 environ.update(self.environmentOverride) 218 return subprocess.Popen(command, stderr=stderr, stdout=stdout, env=environ) 219 220 def run_cmd(self, command, expected_code=0): 221 try: 222 proc = self._start_process(command, stdout=subprocess.PIPE) 223 output, _ = proc.communicate() 224 if expected_code is not None: 225 if proc.returncode != expected_code: 226 raise Exception("exit code %d != %d (output: %s)" % (proc.returncode, expected_code, output)) 227 return output 228 except Exception, e: 229 raise Exception("Running command %s failed: %s" % (command, str(e))) 230 231 def gold_processor(self, gold): 232 return gold 233 234 def output_processor(self, output): 235 return output 236 237 def expand_environ(self, gold): 238 return string.Template(gold).substitute(os.environ) 239 240 def get_gold_filename(self, gold_name="gold"): 241 return os.path.join(os.path.dirname(inspect.getsourcefile(self.__class__)), self.__class__.__name__ + "_" + gold_name) 242 243 def get_gold(self, gold_name="gold"): 244 return self.gold_processor(open(self.get_gold_filename(gold_name), "r").read()) 245 246 def check_output(self): 247 try: 248 self.verify_match(self.get_gold(), self.output, match_prepare=self.outputMatchPrepare) 249 except: # noqa: E722 250 if self.captureResults: 251 if self.outputMatchPrepare is not None: 252 self.output = self.outputMatchPrepare(self.output) 253 with open(self.get_gold_filename(), "w") as f: 254 f.write(self.output) 255 else: 256 raise 257 258 def check_cmd_output(self, command, gold_name, match_prepare=None, expected_code=0): 259 output = self.run_cmd(command, expected_code=expected_code) 260 try: 261 self.verify_match(self.get_gold(gold_name), output, match_prepare) 262 except: # noqa: E722 263 if self.captureResults: 264 if match_prepare is not None: 265 output = match_prepare(output) 266 with open(self.get_gold_filename(gold_name), "w") as f: 267 f.write(output) 268 else: 269 raise 270 271 def read_file(self, path): 272 with open(os.path.join(os.environ["HOME"], ".aptly", path), "r") as f: 273 return f.read() 274 275 def delete_file(self, path): 276 os.unlink(os.path.join(os.environ["HOME"], ".aptly", path)) 277 278 def check_file_contents(self, path, gold_name, match_prepare=None): 279 contents = self.read_file(path) 280 try: 281 282 self.verify_match(self.get_gold(gold_name), contents, match_prepare=match_prepare) 283 except: # noqa: E722 284 if self.captureResults: 285 if match_prepare is not None: 286 contents = match_prepare(contents) 287 with open(self.get_gold_filename(gold_name), "w") as f: 288 f.write(contents) 289 else: 290 raise 291 292 def check_file(self): 293 contents = open(self.checkedFile, "r").read() 294 try: 295 self.verify_match(self.get_gold(), contents) 296 except: # noqa: E722 297 if self.captureResults: 298 with open(self.get_gold_filename(), "w") as f: 299 f.write(contents) 300 else: 301 raise 302 303 def check_exists(self, path): 304 if not os.path.exists(os.path.join(os.environ["HOME"], ".aptly", path)): 305 raise Exception("path %s doesn't exist" % (path, )) 306 307 def check_not_exists(self, path): 308 if os.path.exists(os.path.join(os.environ["HOME"], ".aptly", path)): 309 raise Exception("path %s exists" % (path, )) 310 311 def check_file_not_empty(self, path): 312 if os.stat(os.path.join(os.environ["HOME"], ".aptly", path))[6] == 0: 313 raise Exception("file %s is empty" % (path, )) 314 315 def check_equal(self, a, b): 316 if a != b: 317 self.verify_match(a, b, match_prepare=pprint.pformat) 318 319 def check_ge(self, a, b): 320 if not a >= b: 321 raise Exception("%s is not greater or equal to %s" % (a, b)) 322 323 def check_gt(self, a, b): 324 if not a > b: 325 raise Exception("%s is not greater to %s" % (a, b)) 326 327 def check_in(self, item, l): 328 if item not in l: 329 raise Exception("item %r not in %r", item, l) 330 331 def check_subset(self, a, b): 332 diff = '' 333 for k, v in a.items(): 334 if k not in b: 335 diff += "unexpected key '%s'\n" % (k,) 336 elif b[k] != v: 337 diff += "wrong value '%s' for key '%s', expected '%s'\n" % (v, k, b[k]) 338 if diff: 339 raise Exception("content doesn't match:\n" + diff) 340 341 def verify_match(self, a, b, match_prepare=None): 342 if match_prepare is not None: 343 a = match_prepare(a) 344 b = match_prepare(b) 345 346 if a != b: 347 diff = "".join(difflib.unified_diff([l + "\n" for l in a.split("\n")], [l + "\n" for l in b.split("\n")])) 348 349 raise Exception("content doesn't match:\n" + diff + "\n") 350 351 check = check_output 352 353 def prepare(self): 354 self.prepare_remove_all() 355 self.prepare_default_config() 356 self.prepare_fixture() 357 358 def start_webserver(self, directory): 359 FileHTTPServerRequestHandler.rootPath = directory 360 self.webserver = ThreadedTCPServer(("localhost", 0), FileHTTPServerRequestHandler) 361 362 server_thread = threading.Thread(target=self.webserver.serve_forever) 363 server_thread.daemon = True 364 server_thread.start() 365 366 return "http://%s:%d/" % self.webserver.server_address 367 368 def shutdown(self): 369 if hasattr(self, 'webserver'): 370 self.shutdown_webserver() 371 372 def shutdown_webserver(self): 373 self.webserver.shutdown() 374 375 @classmethod 376 def shutdown_class(cls): 377 pass 378