1# Copyright (C) 2003-2009  Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
18
19"""
20A stub SFTP server for loopback SFTP testing.
21"""
22
23import os
24
25from paramiko import (
26    AUTH_SUCCESSFUL,
27    OPEN_SUCCEEDED,
28    SFTPAttributes,
29    SFTPHandle,
30    SFTPServer,
31    SFTPServerInterface,
32    SFTP_FAILURE,
33    SFTP_OK,
34    ServerInterface,
35)
36from paramiko.common import o666
37
38
39class StubServer(ServerInterface):
40    def check_auth_password(self, username, password):
41        # all are allowed
42        return AUTH_SUCCESSFUL
43
44    def check_channel_request(self, kind, chanid):
45        return OPEN_SUCCEEDED
46
47
48class StubSFTPHandle(SFTPHandle):
49    def stat(self):
50        try:
51            return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno()))
52        except OSError as e:
53            return SFTPServer.convert_errno(e.errno)
54
55    def chattr(self, attr):
56        # python doesn't have equivalents to fchown or fchmod, so we have to
57        # use the stored filename
58        try:
59            SFTPServer.set_file_attr(self.filename, attr)
60            return SFTP_OK
61        except OSError as e:
62            return SFTPServer.convert_errno(e.errno)
63
64
65class StubSFTPServer(SFTPServerInterface):
66    # assume current folder is a fine root
67    # (the tests always create and eventually delete a subfolder, so there
68    # shouldn't be any mess)
69    ROOT = os.getcwd()
70
71    def _realpath(self, path):
72        return self.ROOT + self.canonicalize(path)
73
74    def list_folder(self, path):
75        path = self._realpath(path)
76        try:
77            out = []
78            flist = os.listdir(path)
79            for fname in flist:
80                attr = SFTPAttributes.from_stat(
81                    os.stat(os.path.join(path, fname))
82                )
83                attr.filename = fname
84                out.append(attr)
85            return out
86        except OSError as e:
87            return SFTPServer.convert_errno(e.errno)
88
89    def stat(self, path):
90        path = self._realpath(path)
91        try:
92            return SFTPAttributes.from_stat(os.stat(path))
93        except OSError as e:
94            return SFTPServer.convert_errno(e.errno)
95
96    def lstat(self, path):
97        path = self._realpath(path)
98        try:
99            return SFTPAttributes.from_stat(os.lstat(path))
100        except OSError as e:
101            return SFTPServer.convert_errno(e.errno)
102
103    def open(self, path, flags, attr):
104        path = self._realpath(path)
105        try:
106            binary_flag = getattr(os, "O_BINARY", 0)
107            flags |= binary_flag
108            mode = getattr(attr, "st_mode", None)
109            if mode is not None:
110                fd = os.open(path, flags, mode)
111            else:
112                # os.open() defaults to 0777 which is
113                # an odd default mode for files
114                fd = os.open(path, flags, o666)
115        except OSError as e:
116            return SFTPServer.convert_errno(e.errno)
117        if (flags & os.O_CREAT) and (attr is not None):
118            attr._flags &= ~attr.FLAG_PERMISSIONS
119            SFTPServer.set_file_attr(path, attr)
120        if flags & os.O_WRONLY:
121            if flags & os.O_APPEND:
122                fstr = "ab"
123            else:
124                fstr = "wb"
125        elif flags & os.O_RDWR:
126            if flags & os.O_APPEND:
127                fstr = "a+b"
128            else:
129                fstr = "r+b"
130        else:
131            # O_RDONLY (== 0)
132            fstr = "rb"
133        try:
134            f = os.fdopen(fd, fstr)
135        except OSError as e:
136            return SFTPServer.convert_errno(e.errno)
137        fobj = StubSFTPHandle(flags)
138        fobj.filename = path
139        fobj.readfile = f
140        fobj.writefile = f
141        return fobj
142
143    def remove(self, path):
144        path = self._realpath(path)
145        try:
146            os.remove(path)
147        except OSError as e:
148            return SFTPServer.convert_errno(e.errno)
149        return SFTP_OK
150
151    def rename(self, oldpath, newpath):
152        oldpath = self._realpath(oldpath)
153        newpath = self._realpath(newpath)
154        if os.path.exists(newpath):
155            return SFTP_FAILURE
156        try:
157            os.rename(oldpath, newpath)
158        except OSError as e:
159            return SFTPServer.convert_errno(e.errno)
160        return SFTP_OK
161
162    def posix_rename(self, oldpath, newpath):
163        oldpath = self._realpath(oldpath)
164        newpath = self._realpath(newpath)
165        try:
166            os.rename(oldpath, newpath)
167        except OSError as e:
168            return SFTPServer.convert_errno(e.errno)
169        return SFTP_OK
170
171    def mkdir(self, path, attr):
172        path = self._realpath(path)
173        try:
174            os.mkdir(path)
175            if attr is not None:
176                SFTPServer.set_file_attr(path, attr)
177        except OSError as e:
178            return SFTPServer.convert_errno(e.errno)
179        return SFTP_OK
180
181    def rmdir(self, path):
182        path = self._realpath(path)
183        try:
184            os.rmdir(path)
185        except OSError as e:
186            return SFTPServer.convert_errno(e.errno)
187        return SFTP_OK
188
189    def chattr(self, path, attr):
190        path = self._realpath(path)
191        try:
192            SFTPServer.set_file_attr(path, attr)
193        except OSError as e:
194            return SFTPServer.convert_errno(e.errno)
195        return SFTP_OK
196
197    def symlink(self, target_path, path):
198        path = self._realpath(path)
199        if (len(target_path) > 0) and (target_path[0] == "/"):
200            # absolute symlink
201            target_path = os.path.join(self.ROOT, target_path[1:])
202            if target_path[:2] == "//":
203                # bug in os.path.join
204                target_path = target_path[1:]
205        else:
206            # compute relative to path
207            abspath = os.path.join(os.path.dirname(path), target_path)
208            if abspath[: len(self.ROOT)] != self.ROOT:
209                # this symlink isn't going to work anyway -- just break it
210                # immediately
211                target_path = "<error>"
212        try:
213            os.symlink(target_path, path)
214        except OSError as e:
215            return SFTPServer.convert_errno(e.errno)
216        return SFTP_OK
217
218    def readlink(self, path):
219        path = self._realpath(path)
220        try:
221            symlink = os.readlink(path)
222        except OSError as e:
223            return SFTPServer.convert_errno(e.errno)
224        # if it's absolute, remove the root
225        if os.path.isabs(symlink):
226            if symlink[: len(self.ROOT)] == self.ROOT:
227                symlink = symlink[len(self.ROOT) :]
228                if (len(symlink) == 0) or (symlink[0] != "/"):
229                    symlink = "/" + symlink
230            else:
231                symlink = "<error>"
232        return symlink
233