1"""
2Utilities for handling RAR and ZIP archives
3
4Provides wrapper archive and exception classes to simplify
5archive extraction
6"""
7
8
9import os
10import shutil
11import zipfile
12
13from loguru import logger
14
15try:
16    import rarfile
17except ImportError:
18    rarfile = None
19
20logger = logger.bind(name='archive')
21
22
23class ArchiveError(Exception):
24    """Base exception for archive"""
25
26    pass
27
28
29class NeedRarFile(ArchiveError):
30    """Exception to be raised when rarfile module is missing"""
31
32    pass
33
34
35class BadArchive(ArchiveError):
36    """Wrapper exception for BadZipFile and BadRarFile"""
37
38    pass
39
40
41class NeedFirstVolume(ArchiveError):
42    """Wrapper exception for rarfile.NeedFirstVolume"""
43
44    pass
45
46
47class PathError(ArchiveError):
48    """Exception to be raised when an archive file doesn't exist"""
49
50    pass
51
52
53class FSError(ArchiveError):
54    """Exception to be raised on OS/IO exceptions"""
55
56    pass
57
58
59class FileAlreadyExists(ArchiveError):
60    """Exception to be raised when destination file already exists"""
61
62    pass
63
64
65def rarfile_set_tool_path(config):
66    """
67    Manually set the path of unrar executable if it can't be resolved from the
68    PATH environment variable
69    """
70    unrar_tool = config['unrar_tool']
71
72    if unrar_tool:
73        if not rarfile:
74            logger.error('rar_tool specified with no rarfile module installed.')
75        else:
76            rarfile.UNRAR_TOOL = unrar_tool
77            logger.debug('Set RarFile.unrar_tool to: {}', unrar_tool)
78
79
80def rarfile_set_path_sep(separator):
81    """
82    Set the path separator on rarfile module
83    """
84    if rarfile:
85        rarfile.PATH_SEP = separator
86
87
88def makepath(path):
89    """Make directories as needed"""
90    if not os.path.exists(path):
91        logger.debug('Creating path: {}', path)
92        os.makedirs(path)
93
94
95class Archive:
96    """
97    Base archive class. Assumes an interface similar to
98    zipfile.ZipFile or rarfile.RarFile
99    """
100
101    def __init__(self, archive_object, path):
102        self.path = path
103
104        self.archive = archive_object(self.path)
105
106    def close(self):
107        """Release open resources."""
108        self.archive.close()
109
110    def delete(self):
111        """Delete the volumes that make up this archive"""
112        volumes = self.volumes()
113        self.close()
114
115        try:
116            for volume in volumes:
117                os.remove(volume)
118                logger.verbose('Deleted archive: {}', volume)
119        except OSError as error:
120            raise FSError(error)
121
122    def volumes(self):
123        """Returns the list of volumes that comprise this archive"""
124        return [self.path]
125
126    def infolist(self):
127        """Returns a list of info objects describing the contents of this archive"""
128        infolist = []
129
130        for info in self.archive.infolist():
131            try:
132                archive_info = ArchiveInfo(info)
133                infolist.append(archive_info)
134            except ValueError as e:
135                logger.debug(e)
136
137        return infolist
138
139    def open(self, member):
140        """Returns file-like object from where the data of a member file can be read."""
141        return self.archive.open(member)
142
143    def extract_file(self, member, destination):
144        """Extract a member file to the specified destination"""
145        try:
146            with self.open(member) as source:
147                with open(destination, 'wb') as target:
148                    shutil.copyfileobj(source, target)
149        except OSError as error:
150            raise FSError(error)
151
152
153class RarArchive(Archive):
154    """
155    Wrapper class for rarfile.RarFile
156    """
157
158    def __init__(self, path):
159        RarArchive.check_import()
160
161        try:
162            super().__init__(rarfile.RarFile, path)
163        except rarfile.BadRarFile as error:
164            raise BadArchive(error)
165        except rarfile.NeedFirstVolume as error:
166            raise NeedFirstVolume(error)
167        except rarfile.Error as error:
168            raise ArchiveError(error)
169
170    def volumes(self):
171        """Returns the list of volumes that comprise this archive"""
172        return self.archive.volumelist()
173
174    def open(self, member):
175        """Returns file-like object from where the data of a member file can be read."""
176        try:
177            return super().open(member)
178        except rarfile.Error as error:
179            raise ArchiveError(error)
180
181    @staticmethod
182    def check_import():
183        if not rarfile:
184            raise NeedRarFile('Python module rarfile needed to handle RAR archives')
185
186
187class ZipArchive(Archive):
188    """
189    Wrapper class for zipfile.ZipFile
190    """
191
192    def __init__(self, path):
193        try:
194            super().__init__(zipfile.ZipFile, path)
195        except zipfile.BadZipfile as error:
196            raise BadArchive(error)
197
198    def open(self, member):
199        """Returns file-like object from where the data of a member file can be read."""
200        try:
201            return super().open(member)
202        except zipfile.BadZipfile as error:
203            raise ArchiveError(error)
204
205
206class ArchiveInfo:
207    """Wrapper class for  archive info objects"""
208
209    def __init__(self, info):
210        self.info = info
211        self.path = info.filename
212        self.filename = os.path.basename(self.path)
213
214        if self._is_dir():
215            raise ValueError('Appears to be a directory: %s' % self.path)
216
217    def _is_dir(self):
218        """Indicates if info object looks to be a directory"""
219
220        if hasattr(self.info, 'isdir'):
221            return self.info.isdir()
222        else:
223            return not self.filename
224
225    def extract(self, archive, destination):
226        """Extract ArchiveInfo object to the specified destination"""
227        dest_dir = os.path.dirname(destination)
228
229        if os.path.exists(destination):
230            raise FileAlreadyExists('File already exists: %s' % destination)
231
232        logger.debug('Creating path: {}', dest_dir)
233        makepath(dest_dir)
234
235        try:
236            archive.extract_file(self.info, destination)
237            logger.verbose('Extracted: {} to {}', self.path, destination)
238        except Exception as error:
239            if os.path.exists(destination):
240                logger.debug('Cleaning up partially extracted file: {}', destination)
241                os.remove(destination)
242
243            raise error
244
245
246def open_archive(archive_path):
247    """
248    Returns the appropriate archive object
249    """
250
251    archive = None
252
253    if not os.path.exists(archive_path):
254        raise PathError('Path doesn\'t exist')
255
256    if zipfile.is_zipfile(archive_path):
257        archive = ZipArchive(archive_path)
258        logger.debug('Successfully opened ZIP: {}', archive_path)
259    elif rarfile and rarfile.is_rarfile(archive_path):
260        archive = RarArchive(archive_path)
261        logger.debug('Successfully opened RAR: {}', archive_path)
262    else:
263        if not rarfile:
264            logger.warning('Rarfile module not installed; unable to handle RAR archives.')
265
266    return archive
267
268
269def is_archive(path):
270    """
271    Attempts to open an entry as an archive; returns True on success, False on failure.
272    """
273
274    archive = None
275
276    try:
277        archive = open_archive(path)
278        if archive:
279            archive.close()
280            return True
281    except (OSError, ArchiveError) as error:
282        logger.debug('Failed to open file as archive: {} ({})', path, error)
283
284    return False
285