1# Copyright (C) 2003-2009 Robey Pointer <robeypointer@gmail.com> 2# 3# This file is part of paramiko. 4# 5# Paramiko is free software; you can redistribute it and/or modify it under the 6# terms of the GNU Lesser General Public License as published by the Free 7# Software Foundation; either version 2.1 of the License, or (at your option) 8# any later version. 9# 10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 13# details. 14# 15# You should have received a copy of the GNU Lesser General Public License 16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 17# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. 18 19""" 20A stub SFTP server for loopback SFTP testing. 21""" 22 23import os 24 25from paramiko import ( 26 AUTH_SUCCESSFUL, 27 OPEN_SUCCEEDED, 28 SFTPAttributes, 29 SFTPHandle, 30 SFTPServer, 31 SFTPServerInterface, 32 SFTP_FAILURE, 33 SFTP_OK, 34 ServerInterface, 35) 36from paramiko.common import o666 37 38 39class StubServer(ServerInterface): 40 def check_auth_password(self, username, password): 41 # all are allowed 42 return AUTH_SUCCESSFUL 43 44 def check_channel_request(self, kind, chanid): 45 return OPEN_SUCCEEDED 46 47 48class StubSFTPHandle(SFTPHandle): 49 def stat(self): 50 try: 51 return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno())) 52 except OSError as e: 53 return SFTPServer.convert_errno(e.errno) 54 55 def chattr(self, attr): 56 # python doesn't have equivalents to fchown or fchmod, so we have to 57 # use the stored filename 58 try: 59 SFTPServer.set_file_attr(self.filename, attr) 60 return SFTP_OK 61 except OSError as e: 62 return SFTPServer.convert_errno(e.errno) 63 64 65class StubSFTPServer(SFTPServerInterface): 66 # assume current folder is a fine root 67 # (the tests always create and eventually delete a subfolder, so there 68 # shouldn't be any mess) 69 ROOT = os.getcwd() 70 71 def _realpath(self, path): 72 return self.ROOT + self.canonicalize(path) 73 74 def list_folder(self, path): 75 path = self._realpath(path) 76 try: 77 out = [] 78 flist = os.listdir(path) 79 for fname in flist: 80 attr = SFTPAttributes.from_stat( 81 os.stat(os.path.join(path, fname)) 82 ) 83 attr.filename = fname 84 out.append(attr) 85 return out 86 except OSError as e: 87 return SFTPServer.convert_errno(e.errno) 88 89 def stat(self, path): 90 path = self._realpath(path) 91 try: 92 return SFTPAttributes.from_stat(os.stat(path)) 93 except OSError as e: 94 return SFTPServer.convert_errno(e.errno) 95 96 def lstat(self, path): 97 path = self._realpath(path) 98 try: 99 return SFTPAttributes.from_stat(os.lstat(path)) 100 except OSError as e: 101 return SFTPServer.convert_errno(e.errno) 102 103 def open(self, path, flags, attr): 104 path = self._realpath(path) 105 try: 106 binary_flag = getattr(os, "O_BINARY", 0) 107 flags |= binary_flag 108 mode = getattr(attr, "st_mode", None) 109 if mode is not None: 110 fd = os.open(path, flags, mode) 111 else: 112 # os.open() defaults to 0777 which is 113 # an odd default mode for files 114 fd = os.open(path, flags, o666) 115 except OSError as e: 116 return SFTPServer.convert_errno(e.errno) 117 if (flags & os.O_CREAT) and (attr is not None): 118 attr._flags &= ~attr.FLAG_PERMISSIONS 119 SFTPServer.set_file_attr(path, attr) 120 if flags & os.O_WRONLY: 121 if flags & os.O_APPEND: 122 fstr = "ab" 123 else: 124 fstr = "wb" 125 elif flags & os.O_RDWR: 126 if flags & os.O_APPEND: 127 fstr = "a+b" 128 else: 129 fstr = "r+b" 130 else: 131 # O_RDONLY (== 0) 132 fstr = "rb" 133 try: 134 f = os.fdopen(fd, fstr) 135 except OSError as e: 136 return SFTPServer.convert_errno(e.errno) 137 fobj = StubSFTPHandle(flags) 138 fobj.filename = path 139 fobj.readfile = f 140 fobj.writefile = f 141 return fobj 142 143 def remove(self, path): 144 path = self._realpath(path) 145 try: 146 os.remove(path) 147 except OSError as e: 148 return SFTPServer.convert_errno(e.errno) 149 return SFTP_OK 150 151 def rename(self, oldpath, newpath): 152 oldpath = self._realpath(oldpath) 153 newpath = self._realpath(newpath) 154 if os.path.exists(newpath): 155 return SFTP_FAILURE 156 try: 157 os.rename(oldpath, newpath) 158 except OSError as e: 159 return SFTPServer.convert_errno(e.errno) 160 return SFTP_OK 161 162 def posix_rename(self, oldpath, newpath): 163 oldpath = self._realpath(oldpath) 164 newpath = self._realpath(newpath) 165 try: 166 os.rename(oldpath, newpath) 167 except OSError as e: 168 return SFTPServer.convert_errno(e.errno) 169 return SFTP_OK 170 171 def mkdir(self, path, attr): 172 path = self._realpath(path) 173 try: 174 os.mkdir(path) 175 if attr is not None: 176 SFTPServer.set_file_attr(path, attr) 177 except OSError as e: 178 return SFTPServer.convert_errno(e.errno) 179 return SFTP_OK 180 181 def rmdir(self, path): 182 path = self._realpath(path) 183 try: 184 os.rmdir(path) 185 except OSError as e: 186 return SFTPServer.convert_errno(e.errno) 187 return SFTP_OK 188 189 def chattr(self, path, attr): 190 path = self._realpath(path) 191 try: 192 SFTPServer.set_file_attr(path, attr) 193 except OSError as e: 194 return SFTPServer.convert_errno(e.errno) 195 return SFTP_OK 196 197 def symlink(self, target_path, path): 198 path = self._realpath(path) 199 if (len(target_path) > 0) and (target_path[0] == "/"): 200 # absolute symlink 201 target_path = os.path.join(self.ROOT, target_path[1:]) 202 if target_path[:2] == "//": 203 # bug in os.path.join 204 target_path = target_path[1:] 205 else: 206 # compute relative to path 207 abspath = os.path.join(os.path.dirname(path), target_path) 208 if abspath[: len(self.ROOT)] != self.ROOT: 209 # this symlink isn't going to work anyway -- just break it 210 # immediately 211 target_path = "<error>" 212 try: 213 os.symlink(target_path, path) 214 except OSError as e: 215 return SFTPServer.convert_errno(e.errno) 216 return SFTP_OK 217 218 def readlink(self, path): 219 path = self._realpath(path) 220 try: 221 symlink = os.readlink(path) 222 except OSError as e: 223 return SFTPServer.convert_errno(e.errno) 224 # if it's absolute, remove the root 225 if os.path.isabs(symlink): 226 if symlink[: len(self.ROOT)] == self.ROOT: 227 symlink = symlink[len(self.ROOT) :] 228 if (len(symlink) == 0) or (symlink[0] != "/"): 229 symlink = "/" + symlink 230 else: 231 symlink = "<error>" 232 return symlink 233