1""" 2pgxnclient -- zip file utilities 3""" 4 5# Copyright (C) 2011-2021 Daniele Varrazzo 6 7# This file is part of the PGXN client 8 9import os 10import stat 11import zipfile 12 13from pgxnclient.i18n import _ 14from pgxnclient.errors import PgxnClientException 15from pgxnclient.archive import Archive 16 17import logging 18 19logger = logging.getLogger('pgxnclient.zip') 20 21 22class ZipArchive(Archive): 23 """Handle .zip archives""" 24 25 _file = None 26 27 def can_open(self): 28 return zipfile.is_zipfile(self.filename) 29 30 def open(self): 31 assert not self._file, "archive already open" 32 try: 33 self._file = zipfile.ZipFile(self.filename, 'r') 34 except Exception as e: 35 raise PgxnClientException( 36 _("cannot open archive '%s': %s") % (self.filename, e) 37 ) 38 39 def close(self): 40 if self._file is not None: 41 self._file.close() 42 self._file = None 43 44 def list_files(self): 45 assert self._file, "archive not open" 46 return self._file.namelist() 47 48 def read(self, fn): 49 assert self._file, "archive not open" 50 return self._file.read(fn) 51 52 def unpack(self, destdir): 53 zipname = self.filename 54 logger.info(_("unpacking: %s"), zipname) 55 destdir = os.path.abspath(destdir) 56 self.open() 57 try: 58 for fn in self.list_files(): 59 fname = os.path.abspath(os.path.join(destdir, fn)) 60 if not fname.startswith(destdir): 61 raise PgxnClientException( 62 _("archive file '%s' trying to escape!") % fname 63 ) 64 65 # Looks like checking for a trailing / is the only way to 66 # tell if the file is a directory. 67 if fn.endswith('/'): 68 os.makedirs(fname) 69 continue 70 71 # The directory is not always explicitly present in the archive 72 if not os.path.exists(os.path.dirname(fname)): 73 os.makedirs(os.path.dirname(fname)) 74 75 # Copy the file content 76 logger.debug(_("saving: %s"), fname) 77 fout = open(fname, "wb") 78 try: 79 data = self.read(fn) 80 # In order to restore the executable bit, I haven't find 81 # anything that looks like an executable flag in the zipinfo, 82 # so look at the hashbangs... 83 isexec = data[:2] == b'#!' 84 fout.write(data) 85 finally: 86 fout.close() 87 88 if isexec: 89 os.chmod( 90 fname, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC 91 ) 92 93 finally: 94 self.close() 95 96 return self._find_work_directory(destdir) 97 98 99def unpack(filename, destdir): 100 return ZipArchive(filename).unpack(destdir) 101