1# -*- test-case-name: twisted.python.test.test_zippath -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6This module contains implementations of L{IFilePath} for zip files. 7 8See the constructor of L{ZipArchive} for use. 9""" 10 11 12import errno 13import os 14import time 15from typing import Dict 16from zipfile import ZipFile 17 18from zope.interface import implementer 19 20from twisted.python.compat import cmp, comparable 21from twisted.python.filepath import ( 22 AbstractFilePath, 23 FilePath, 24 IFilePath, 25 UnlistableError, 26 _coerceToFilesystemEncoding, 27) 28 29ZIP_PATH_SEP = "/" # In zipfiles, "/" is universally used as the 30# path separator, regardless of platform. 31 32 33@comparable 34@implementer(IFilePath) 35class ZipPath(AbstractFilePath): 36 """ 37 I represent a file or directory contained within a zip file. 38 """ 39 40 def __init__(self, archive, pathInArchive): 41 """ 42 Don't construct me directly. Use C{ZipArchive.child()}. 43 44 @param archive: a L{ZipArchive} instance. 45 46 @param pathInArchive: a ZIP_PATH_SEP-separated string. 47 """ 48 self.archive = archive 49 self.pathInArchive = pathInArchive 50 51 # self.path pretends to be os-specific because that's the way the 52 # 'zipimport' module does it. 53 sep = _coerceToFilesystemEncoding(pathInArchive, ZIP_PATH_SEP) 54 archiveFilename = _coerceToFilesystemEncoding( 55 pathInArchive, archive.zipfile.filename 56 ) 57 self.path = os.path.join(archiveFilename, *(self.pathInArchive.split(sep))) 58 59 def __cmp__(self, other): 60 if not isinstance(other, ZipPath): 61 return NotImplemented 62 return cmp( 63 (self.archive, self.pathInArchive), (other.archive, other.pathInArchive) 64 ) 65 66 def __repr__(self) -> str: 67 parts = [ 68 _coerceToFilesystemEncoding(self.sep, os.path.abspath(self.archive.path)) 69 ] 70 parts.extend(self.pathInArchive.split(self.sep)) 71 ossep = _coerceToFilesystemEncoding(self.sep, os.sep) 72 return f"ZipPath({ossep.join(parts)!r})" 73 74 @property 75 def sep(self): 76 """ 77 Return a zip directory separator. 78 79 @return: The zip directory separator. 80 @returntype: The same type as C{self.path}. 81 """ 82 return _coerceToFilesystemEncoding(self.path, ZIP_PATH_SEP) 83 84 def parent(self): 85 splitup = self.pathInArchive.split(self.sep) 86 if len(splitup) == 1: 87 return self.archive 88 return ZipPath(self.archive, self.sep.join(splitup[:-1])) 89 90 def child(self, path): 91 """ 92 Return a new ZipPath representing a path in C{self.archive} which is 93 a child of this path. 94 95 @note: Requesting the C{".."} (or other special name) child will not 96 cause L{InsecurePath} to be raised since these names do not have 97 any special meaning inside a zip archive. Be particularly 98 careful with the C{path} attribute (if you absolutely must use 99 it) as this means it may include special names with special 100 meaning outside of the context of a zip archive. 101 """ 102 joiner = _coerceToFilesystemEncoding(path, ZIP_PATH_SEP) 103 pathInArchive = _coerceToFilesystemEncoding(path, self.pathInArchive) 104 return ZipPath(self.archive, joiner.join([pathInArchive, path])) 105 106 def sibling(self, path): 107 return self.parent().child(path) 108 109 def exists(self): 110 return self.isdir() or self.isfile() 111 112 def isdir(self): 113 return self.pathInArchive in self.archive.childmap 114 115 def isfile(self): 116 return self.pathInArchive in self.archive.zipfile.NameToInfo 117 118 def islink(self): 119 return False 120 121 def listdir(self): 122 if self.exists(): 123 if self.isdir(): 124 return list(self.archive.childmap[self.pathInArchive].keys()) 125 else: 126 raise UnlistableError(OSError(errno.ENOTDIR, "Leaf zip entry listed")) 127 else: 128 raise UnlistableError( 129 OSError(errno.ENOENT, "Non-existent zip entry listed") 130 ) 131 132 def splitext(self): 133 """ 134 Return a value similar to that returned by C{os.path.splitext}. 135 """ 136 # This happens to work out because of the fact that we use OS-specific 137 # path separators in the constructor to construct our fake 'path' 138 # attribute. 139 return os.path.splitext(self.path) 140 141 def basename(self): 142 return self.pathInArchive.split(self.sep)[-1] 143 144 def dirname(self): 145 # XXX NOTE: This API isn't a very good idea on filepath, but it's even 146 # less meaningful here. 147 return self.parent().path 148 149 def open(self, mode="r"): 150 pathInArchive = _coerceToFilesystemEncoding("", self.pathInArchive) 151 return self.archive.zipfile.open(pathInArchive, mode=mode) 152 153 def changed(self): 154 pass 155 156 def getsize(self): 157 """ 158 Retrieve this file's size. 159 160 @return: file size, in bytes 161 """ 162 pathInArchive = _coerceToFilesystemEncoding("", self.pathInArchive) 163 return self.archive.zipfile.NameToInfo[pathInArchive].file_size 164 165 def getAccessTime(self): 166 """ 167 Retrieve this file's last access-time. This is the same as the last access 168 time for the archive. 169 170 @return: a number of seconds since the epoch 171 """ 172 return self.archive.getAccessTime() 173 174 def getModificationTime(self): 175 """ 176 Retrieve this file's last modification time. This is the time of 177 modification recorded in the zipfile. 178 179 @return: a number of seconds since the epoch. 180 """ 181 pathInArchive = _coerceToFilesystemEncoding("", self.pathInArchive) 182 return time.mktime( 183 self.archive.zipfile.NameToInfo[pathInArchive].date_time + (0, 0, 0) 184 ) 185 186 def getStatusChangeTime(self): 187 """ 188 Retrieve this file's last modification time. This name is provided for 189 compatibility, and returns the same value as getmtime. 190 191 @return: a number of seconds since the epoch. 192 """ 193 return self.getModificationTime() 194 195 196class ZipArchive(ZipPath): 197 """ 198 I am a L{FilePath}-like object which can wrap a zip archive as if it were a 199 directory. 200 201 It works similarly to L{FilePath} in L{bytes} and L{unicode} handling -- 202 instantiating with a L{bytes} will return a "bytes mode" L{ZipArchive}, 203 and instantiating with a L{unicode} will return a "text mode" 204 L{ZipArchive}. Methods that return new L{ZipArchive} or L{ZipPath} 205 instances will be in the mode of the argument to the creator method, 206 converting if required. 207 """ 208 209 @property 210 def archive(self): 211 return self 212 213 def __init__(self, archivePathname): 214 """ 215 Create a ZipArchive, treating the archive at archivePathname as a zip 216 file. 217 218 @param archivePathname: a L{bytes} or L{unicode}, naming a path in the 219 filesystem. 220 """ 221 self.path = archivePathname 222 self.zipfile = ZipFile(_coerceToFilesystemEncoding("", archivePathname)) 223 self.pathInArchive = _coerceToFilesystemEncoding(archivePathname, "") 224 # zipfile is already wasting O(N) memory on cached ZipInfo instances, 225 # so there's no sense in trying to do this lazily or intelligently 226 self.childmap: Dict[str, Dict[str, int]] = {} 227 228 for name in self.zipfile.namelist(): 229 name = _coerceToFilesystemEncoding(self.path, name).split(self.sep) 230 for x in range(len(name)): 231 child = name[-x] 232 parent = self.sep.join(name[:-x]) 233 if parent not in self.childmap: 234 self.childmap[parent] = {} 235 self.childmap[parent][child] = 1 236 parent = _coerceToFilesystemEncoding(archivePathname, "") 237 238 def child(self, path): 239 """ 240 Create a ZipPath pointing at a path within the archive. 241 242 @param path: a L{bytes} or L{unicode} with no path separators in it 243 (either '/' or the system path separator, if it's different). 244 """ 245 return ZipPath(self, path) 246 247 def exists(self): 248 """ 249 Returns C{True} if the underlying archive exists. 250 """ 251 return FilePath(self.zipfile.filename).exists() 252 253 def getAccessTime(self): 254 """ 255 Return the archive file's last access time. 256 """ 257 return FilePath(self.zipfile.filename).getAccessTime() 258 259 def getModificationTime(self): 260 """ 261 Return the archive file's modification time. 262 """ 263 return FilePath(self.zipfile.filename).getModificationTime() 264 265 def getStatusChangeTime(self): 266 """ 267 Return the archive file's status change time. 268 """ 269 return FilePath(self.zipfile.filename).getStatusChangeTime() 270 271 def __repr__(self) -> str: 272 return f"ZipArchive({os.path.abspath(self.path)!r})" 273 274 275__all__ = ["ZipArchive", "ZipPath"] 276