1#!/usr/bin/env python 2# 3# Copyright 2009 Google Inc. All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17# pylint: disable-msg=W0612,W0613,C6409 18 19"""A fake shutil module implementation that uses fake_filesystem for unit tests. 20 21Includes: 22 FakeShutil: Uses a FakeFilesystem to provide a fake replacement for the 23 shutil module. 24 25Usage: 26>>> from pyfakefs import fake_filesystem 27>>> from pyfakefs import fake_filesystem_shutil 28>>> filesystem = fake_filesystem.FakeFilesystem() 29>>> shutil_module = fake_filesystem_shutil.FakeShutilModule(filesystem) 30 31Copy a fake_filesystem directory tree: 32>>> new_file = filesystem.CreateFile('/src/new-file') 33>>> shutil_module.copytree('/src', '/dst') 34>>> filesystem.Exists('/dst/new-file') 35True 36 37Remove a fake_filesystem directory tree: 38>>> shutil_module.rmtree('/src') 39>>> filesystem.Exists('/src/new-file') 40False 41""" 42 43import errno 44import os 45import shutil 46import stat 47 48__pychecker__ = 'no-reimportself' 49 50_PERM_WRITE = 0o200 # Write permission bit. 51_PERM_READ = 0o400 # Read permission bit. 52_PERM_ALL = 0o7777 # All permission bits. 53 54 55class FakeShutilModule(object): 56 """Uses a FakeFilesystem to provide a fake replacement for shutil module.""" 57 58 def __init__(self, filesystem): 59 """Construct fake shutil module using the fake filesystem. 60 61 Args: 62 filesystem: FakeFilesystem used to provide file system information 63 """ 64 self.filesystem = filesystem 65 self._shutil_module = shutil 66 67 def rmtree(self, path, ignore_errors=False, onerror=None): 68 """Remove a directory and all its contents. 69 70 Args: 71 path: (str) Directory tree to remove. 72 ignore_errors: (bool) unimplemented 73 onerror: (func) unimplemented 74 """ 75 self.filesystem.RemoveObject(path) 76 77 def copy(self, src, dst): 78 """Copy data and mode bits ("cp src dst"). 79 80 Args: 81 src: (str) source file 82 dst: (str) destination, may be a directory 83 """ 84 if self.filesystem.Exists(dst): 85 if stat.S_ISDIR(self.filesystem.GetObject(dst).st_mode): 86 dst = self.filesystem.JoinPaths(dst, os.path.basename(src)) 87 self.copyfile(src, dst) 88 src_object = self.filesystem.GetObject(src) 89 dst_object = self.filesystem.GetObject(dst) 90 dst_object.st_mode = ((dst_object.st_mode & ~_PERM_ALL) | 91 (src_object.st_mode & _PERM_ALL)) 92 93 def copyfile(self, src, dst): 94 """Copy data from src to dst. 95 96 Args: 97 src: (str) source file 98 dst: (dst) destination file 99 100 Raises: 101 IOError: if the file can't be copied 102 shutil.Error: if the src and dst files are the same 103 """ 104 src_file_object = self.filesystem.GetObject(src) 105 if not src_file_object.st_mode & _PERM_READ: 106 raise IOError(errno.EACCES, 'Permission denied', src) 107 if stat.S_ISDIR(src_file_object.st_mode): 108 raise IOError(errno.EISDIR, 'Is a directory', src) 109 110 dst_dir = os.path.dirname(dst) 111 if dst_dir: 112 if not self.filesystem.Exists(dst_dir): 113 raise IOError(errno.ENOTDIR, 'Not a directory', dst) 114 dst_dir_object = self.filesystem.GetObject(dst_dir) 115 if not dst_dir_object.st_mode & _PERM_WRITE: 116 raise IOError(errno.EACCES, 'Permission denied', dst_dir) 117 118 abspath_src = self.filesystem.NormalizePath( 119 self.filesystem.ResolvePath(src)) 120 abspath_dst = self.filesystem.NormalizePath( 121 self.filesystem.ResolvePath(dst)) 122 if abspath_src == abspath_dst: 123 raise shutil.Error('`%s` and `%s` are the same file' % (src, dst)) 124 125 if self.filesystem.Exists(dst): 126 dst_file_object = self.filesystem.GetObject(dst) 127 if stat.S_ISDIR(dst_file_object.st_mode): 128 raise IOError(errno.EISDIR, 'Is a directory', dst) 129 if not dst_file_object.st_mode & _PERM_WRITE: 130 raise IOError(errno.EACCES, 'Permission denied', dst) 131 dst_file_object.SetContents(src_file_object.contents) 132 133 else: 134 self.filesystem.CreateFile(dst, contents=src_file_object.contents) 135 136 def copystat(self, src, dst): 137 """Copy all stat info (mode bits, atime, and mtime) from src to dst. 138 139 Args: 140 src: (str) source file 141 dst: (str) destination file 142 """ 143 src_object = self.filesystem.GetObject(src) 144 dst_object = self.filesystem.GetObject(dst) 145 dst_object.st_mode = ((dst_object.st_mode & ~_PERM_ALL) | 146 (src_object.st_mode & _PERM_ALL)) 147 dst_object.st_uid = src_object.st_uid 148 dst_object.st_gid = src_object.st_gid 149 dst_object.st_atime = src_object.st_atime 150 dst_object.st_mtime = src_object.st_mtime 151 152 def copy2(self, src, dst): 153 """Copy data and all stat info ("cp -p src dst"). 154 155 Args: 156 src: (str) source file 157 dst: (str) destination, may be a directory 158 """ 159 if self.filesystem.Exists(dst): 160 if stat.S_ISDIR(self.filesystem.GetObject(dst).st_mode): 161 dst = self.filesystem.JoinPaths(dst, os.path.basename(src)) 162 self.copyfile(src, dst) 163 self.copystat(src, dst) 164 165 def copytree(self, src, dst, symlinks=False): 166 """Recursively copy a directory tree. 167 168 Args: 169 src: (str) source directory 170 dst: (str) destination directory, must not already exist 171 symlinks: (bool) copy symlinks as symlinks instead of copying the 172 contents of the linked files. Currently unused. 173 174 Raises: 175 OSError: if src is missing or isn't a directory 176 """ 177 self.filesystem.CreateDirectory(dst) 178 try: 179 directory = self.filesystem.GetObject(src) 180 except IOError as e: 181 raise OSError(e.errno, e.message) 182 if not stat.S_ISDIR(directory.st_mode): 183 raise OSError(errno.ENOTDIR, 184 'Fake os module: %r not a directory' % src) 185 for name in directory.contents: 186 srcname = self.filesystem.JoinPaths(src, name) 187 dstname = self.filesystem.JoinPaths(dst, name) 188 src_mode = self.filesystem.GetObject(srcname).st_mode 189 if stat.S_ISDIR(src_mode): 190 self.copytree(srcname, dstname, symlinks) 191 else: 192 self.copy2(srcname, dstname) 193 194 def move(self, src, dst): 195 """Rename a file or directory. 196 197 Args: 198 src: (str) source file or directory 199 dst: (str) if the src is a directory, the dst must not already exist 200 """ 201 if stat.S_ISDIR(self.filesystem.GetObject(src).st_mode): 202 self.copytree(src, dst, symlinks=True) 203 else: 204 self.copy2(src, dst) 205 self.filesystem.RemoveObject(src) 206 207 def __getattr__(self, name): 208 """Forwards any non-faked calls to the standard shutil module.""" 209 return getattr(self._shutil_module, name) 210 211 212def _RunDoctest(): 213 # pylint: disable-msg=C6111,C6204,W0406 214 import doctest 215 from pyfakefs import fake_filesystem_shutil 216 return doctest.testmod(fake_filesystem_shutil) 217 218 219if __name__ == '__main__': 220 _RunDoctest() 221