1""" 2Utilities for handling RAR and ZIP archives 3 4Provides wrapper archive and exception classes to simplify 5archive extraction 6""" 7 8 9import os 10import shutil 11import zipfile 12 13from loguru import logger 14 15try: 16 import rarfile 17except ImportError: 18 rarfile = None 19 20logger = logger.bind(name='archive') 21 22 23class ArchiveError(Exception): 24 """Base exception for archive""" 25 26 pass 27 28 29class NeedRarFile(ArchiveError): 30 """Exception to be raised when rarfile module is missing""" 31 32 pass 33 34 35class BadArchive(ArchiveError): 36 """Wrapper exception for BadZipFile and BadRarFile""" 37 38 pass 39 40 41class NeedFirstVolume(ArchiveError): 42 """Wrapper exception for rarfile.NeedFirstVolume""" 43 44 pass 45 46 47class PathError(ArchiveError): 48 """Exception to be raised when an archive file doesn't exist""" 49 50 pass 51 52 53class FSError(ArchiveError): 54 """Exception to be raised on OS/IO exceptions""" 55 56 pass 57 58 59class FileAlreadyExists(ArchiveError): 60 """Exception to be raised when destination file already exists""" 61 62 pass 63 64 65def rarfile_set_tool_path(config): 66 """ 67 Manually set the path of unrar executable if it can't be resolved from the 68 PATH environment variable 69 """ 70 unrar_tool = config['unrar_tool'] 71 72 if unrar_tool: 73 if not rarfile: 74 logger.error('rar_tool specified with no rarfile module installed.') 75 else: 76 rarfile.UNRAR_TOOL = unrar_tool 77 logger.debug('Set RarFile.unrar_tool to: {}', unrar_tool) 78 79 80def rarfile_set_path_sep(separator): 81 """ 82 Set the path separator on rarfile module 83 """ 84 if rarfile: 85 rarfile.PATH_SEP = separator 86 87 88def makepath(path): 89 """Make directories as needed""" 90 if not os.path.exists(path): 91 logger.debug('Creating path: {}', path) 92 os.makedirs(path) 93 94 95class Archive: 96 """ 97 Base archive class. Assumes an interface similar to 98 zipfile.ZipFile or rarfile.RarFile 99 """ 100 101 def __init__(self, archive_object, path): 102 self.path = path 103 104 self.archive = archive_object(self.path) 105 106 def close(self): 107 """Release open resources.""" 108 self.archive.close() 109 110 def delete(self): 111 """Delete the volumes that make up this archive""" 112 volumes = self.volumes() 113 self.close() 114 115 try: 116 for volume in volumes: 117 os.remove(volume) 118 logger.verbose('Deleted archive: {}', volume) 119 except OSError as error: 120 raise FSError(error) 121 122 def volumes(self): 123 """Returns the list of volumes that comprise this archive""" 124 return [self.path] 125 126 def infolist(self): 127 """Returns a list of info objects describing the contents of this archive""" 128 infolist = [] 129 130 for info in self.archive.infolist(): 131 try: 132 archive_info = ArchiveInfo(info) 133 infolist.append(archive_info) 134 except ValueError as e: 135 logger.debug(e) 136 137 return infolist 138 139 def open(self, member): 140 """Returns file-like object from where the data of a member file can be read.""" 141 return self.archive.open(member) 142 143 def extract_file(self, member, destination): 144 """Extract a member file to the specified destination""" 145 try: 146 with self.open(member) as source: 147 with open(destination, 'wb') as target: 148 shutil.copyfileobj(source, target) 149 except OSError as error: 150 raise FSError(error) 151 152 153class RarArchive(Archive): 154 """ 155 Wrapper class for rarfile.RarFile 156 """ 157 158 def __init__(self, path): 159 RarArchive.check_import() 160 161 try: 162 super().__init__(rarfile.RarFile, path) 163 except rarfile.BadRarFile as error: 164 raise BadArchive(error) 165 except rarfile.NeedFirstVolume as error: 166 raise NeedFirstVolume(error) 167 except rarfile.Error as error: 168 raise ArchiveError(error) 169 170 def volumes(self): 171 """Returns the list of volumes that comprise this archive""" 172 return self.archive.volumelist() 173 174 def open(self, member): 175 """Returns file-like object from where the data of a member file can be read.""" 176 try: 177 return super().open(member) 178 except rarfile.Error as error: 179 raise ArchiveError(error) 180 181 @staticmethod 182 def check_import(): 183 if not rarfile: 184 raise NeedRarFile('Python module rarfile needed to handle RAR archives') 185 186 187class ZipArchive(Archive): 188 """ 189 Wrapper class for zipfile.ZipFile 190 """ 191 192 def __init__(self, path): 193 try: 194 super().__init__(zipfile.ZipFile, path) 195 except zipfile.BadZipfile as error: 196 raise BadArchive(error) 197 198 def open(self, member): 199 """Returns file-like object from where the data of a member file can be read.""" 200 try: 201 return super().open(member) 202 except zipfile.BadZipfile as error: 203 raise ArchiveError(error) 204 205 206class ArchiveInfo: 207 """Wrapper class for archive info objects""" 208 209 def __init__(self, info): 210 self.info = info 211 self.path = info.filename 212 self.filename = os.path.basename(self.path) 213 214 if self._is_dir(): 215 raise ValueError('Appears to be a directory: %s' % self.path) 216 217 def _is_dir(self): 218 """Indicates if info object looks to be a directory""" 219 220 if hasattr(self.info, 'isdir'): 221 return self.info.isdir() 222 else: 223 return not self.filename 224 225 def extract(self, archive, destination): 226 """Extract ArchiveInfo object to the specified destination""" 227 dest_dir = os.path.dirname(destination) 228 229 if os.path.exists(destination): 230 raise FileAlreadyExists('File already exists: %s' % destination) 231 232 logger.debug('Creating path: {}', dest_dir) 233 makepath(dest_dir) 234 235 try: 236 archive.extract_file(self.info, destination) 237 logger.verbose('Extracted: {} to {}', self.path, destination) 238 except Exception as error: 239 if os.path.exists(destination): 240 logger.debug('Cleaning up partially extracted file: {}', destination) 241 os.remove(destination) 242 243 raise error 244 245 246def open_archive(archive_path): 247 """ 248 Returns the appropriate archive object 249 """ 250 251 archive = None 252 253 if not os.path.exists(archive_path): 254 raise PathError('Path doesn\'t exist') 255 256 if zipfile.is_zipfile(archive_path): 257 archive = ZipArchive(archive_path) 258 logger.debug('Successfully opened ZIP: {}', archive_path) 259 elif rarfile and rarfile.is_rarfile(archive_path): 260 archive = RarArchive(archive_path) 261 logger.debug('Successfully opened RAR: {}', archive_path) 262 else: 263 if not rarfile: 264 logger.warning('Rarfile module not installed; unable to handle RAR archives.') 265 266 return archive 267 268 269def is_archive(path): 270 """ 271 Attempts to open an entry as an archive; returns True on success, False on failure. 272 """ 273 274 archive = None 275 276 try: 277 archive = open_archive(path) 278 if archive: 279 archive.close() 280 return True 281 except (OSError, ArchiveError) as error: 282 logger.debug('Failed to open file as archive: {} ({})', path, error) 283 284 return False 285