1# Copyright (C) 2018-2021, Stefan Schwarzer <sschwarzer@sschwarzer.net>
2# and ftputil contributors (see `doc/contributors.txt`)
3# See the file LICENSE for licensing terms.
4
5import sys
6import unittest.mock
7
8import ftputil.path_encoding
9
10
11__all__ = ["Call", "factory"]
12
13
14class Call:
15    def __init__(self, method_name, *, args=None, kwargs=None, result=None):
16        self.method_name = method_name
17        self.result = result
18        self.args = args
19        self.kwargs = kwargs
20
21    def __repr__(self):
22        return (
23            "{0.__class__.__name__}("
24            "method_name={0.method_name!r}, "
25            "result={0.result!r}, "
26            "args={0.args!r}, "
27            "kwargs={0.kwargs!r})".format(self)
28        )
29
30    def check_call(self, method_name, args=None, kwargs=None):
31        # TODO: Mention printing in the docstring.
32        # TODO: Describe how the comparison is made.
33        """
34        Check the method name, args and kwargs from this `Call` object against
35        the method name, args and kwargs from the system under test.
36
37        Raise an `AssertionError` if there's a mismatch.
38        """
39        print(
40            "  Call from session script:    {} | {!r} | {!r}".format(
41                self.method_name, self.args, self.kwargs
42            )
43        )
44        print(
45            "  Call from system under test: {} | {!r} | {!r}".format(
46                method_name, args, kwargs
47            )
48        )
49
50        def compare(value_name, script_value, sut_value):
51            if script_value is not None:
52                try:
53                    assert script_value == sut_value
54                except AssertionError:
55                    print(
56                        "  Mismatch for `{}`: {!r} != {!r}".format(
57                            value_name, script_value, sut_value
58                        )
59                    )
60                    raise
61
62        compare("method_name", self.method_name, method_name)
63        compare("args", self.args, args)
64        compare("kwargs", self.kwargs, kwargs)
65
66    @staticmethod
67    def _is_exception_class(obj):
68        """
69        Return `True` if `obj` is an exception class, else `False`.
70        """
71        try:
72            return issubclass(obj, Exception)
73        except TypeError:
74            # TypeError: issubclass() arg 1 must be a class
75            return False
76
77    def __call__(self):
78        """
79        Simulate call, returning the result or raising the exception.
80        """
81        if isinstance(self.result, Exception) or self._is_exception_class(self.result):
82            raise self.result
83        else:
84            return self.result
85
86
87class ScriptedSession:
88    """
89    "Scripted" `ftplib.FTP`-like class for testing.
90
91    To avoid actual input/output over sockets or files, specify the values that
92    should be returned by the class's methods.
93
94    The class is instantiated with a `script` argument. This is a list of
95    `Call` objects where each object specifies the name of the `ftplib.FTP`
96    method that is expected to be called and what the method should return. If
97    the value is an exception, it will be raised, not returned.
98
99    In case the method returns a socket (like `transfercmd`), the return value
100    to be specified in the `Call` instance is the content of the underlying
101    socket file.
102
103    The advantage of the approach of this class over the use of
104    `unittest.mock.Mock` objects is that the sequence of calls is clearly
105    visible. With `Mock` objects, the developer must keep in mind all the calls
106    when specifying return values or side effects for the mock methods.
107    """
108
109    # Class-level counter to enumerate `ScriptedSession`s. This makes it
110    # possible to make the output even more compact. Additionally, it's easier
111    # to distinguish numbers like 1, 2, etc. than hexadecimal ids.
112    _session_count = 0
113
114    encoding = ftputil.path_encoding.FTPLIB_DEFAULT_ENCODING
115
116    @classmethod
117    def reset_session_count(cls):
118        cls._session_count = 0
119
120    def __init__(self, script):
121        self.script = script
122        # `File.close` accesses the session `sock` object to set and reset the
123        # timeout. `sock` itself is never _called_ though, so it doesn't make
124        # sense to create a `sock` _call_.
125        self.sock = unittest.mock.Mock(name="socket_attribute")
126        # Index into `script`, the list of `Call` objects
127        self._call_index = 0
128        self.__class__._session_count += 1
129        self._session_count = self.__class__._session_count
130        # Always expect an entry for the constructor.
131        init_call = self._next_script_call("__init__")
132        # The constructor isn't supposed to return anything. The only reason to
133        # call it here is to raise an exception if that was specified in the
134        # `script`.
135        init_call()
136
137    def __str__(self):
138        return "{} {}".format(self.__class__.__name__, self._session_count)
139
140    def _next_script_call(self, requested_attribute):
141        """
142        Return next `Call` object.
143        """
144        print(self, "in `_next_script_call`")
145        try:
146            call = self.script[self._call_index]
147        except IndexError:
148            print("  *** Ran out of `Call` objects for this session {!r}".format(self))
149            print("  Requested attribute was {!r}".format(requested_attribute))
150            raise
151        self._call_index += 1
152        print(self, f"next call: {call!r}")
153        return call
154
155    def __getattr__(self, attribute_name):
156        script_call = self._next_script_call(attribute_name)
157
158        def dummy_method(*args, **kwargs):
159            print(self, "in `__getattr__`")
160            script_call.check_call(attribute_name, args, kwargs)
161            return script_call()
162
163        return dummy_method
164
165    # ----------------------------------------------------------------------
166    # `ftplib.FTP` methods that shouldn't be executed with the default
167    # processing in `__getattr__`
168
169    def dir(self, path, callback):
170        """
171        Call the `callback` for each line in the multiline string
172        `call.result`.
173        """
174        script_call = self._next_script_call("dir")
175        # Check only the path. This requires that the corresponding `Call`
176        # object also solely specifies the path as `args`.
177        script_call.check_call("dir", (path,), None)
178        # Give `dir` the chance to raise an exception if one was specified in
179        # the `Call`'s `result` argument.
180        call_result = script_call()
181        for line in call_result.splitlines():
182            callback(line)
183
184    def ntransfercmd(self, cmd, rest=None):
185        """
186        Simulate the `ftplib.FTP.ntransfercmd` call.
187
188        `ntransfercmd` returns a tuple of a socket and a size argument. The
189        `result` value given when constructing an `ntransfercmd` call specifies
190        an `io.TextIO` or `io.BytesIO` value to be used as the
191        `Socket.makefile` result.
192        """
193        script_call = self._next_script_call("ntransfercmd")
194        script_call.check_call("ntransfercmd", (cmd, rest), None)
195        # Give `ntransfercmd` the chance to raise an exception if one was
196        # specified in the `Call`'s `result` argument.
197        call_result = script_call()
198        mock_socket = unittest.mock.Mock(name="socket")
199        mock_socket.makefile.return_value = call_result
200        # Return `None` for size. The docstring of `ftplib.FTP.ntransfercmd`
201        # says that's a possibility.
202        # TODO: Use a sensible `size` value later if it turns out we need it.
203        return mock_socket, None
204
205    def transfercmd(self, cmd, rest=None):
206        """
207        Simulate the `ftplib.FTP.transfercmd` call.
208
209        `transfercmd` returns a socket. The `result` value given when
210        constructing an `transfercmd` call specifies an `io.TextIO` or
211        `io.BytesIO` value to be used as the `Socket.makefile` result.
212        """
213        script_call = self._next_script_call("transfercmd")
214        script_call.check_call("transfercmd", (cmd, rest), None)
215        # Give `transfercmd` the chance to raise an exception if one was
216        # specified in the `Call`'s `result` argument.
217        call_result = script_call()
218        mock_socket = unittest.mock.Mock(name="socket")
219        mock_socket.makefile.return_value = call_result
220        return mock_socket
221
222
223class MultisessionFactory:
224    """
225    Return a session factory using the scripted data from the given "scripts"
226    for each consecutive call ("creation") of a factory.
227
228    Example:
229
230      host = ftputil.FTPHost(host, user, password,
231                             session_factory=scripted_session.factory(script1, script2))
232
233    When the `session_factory` is "instantiated" for the first time by
234    `FTPHost._make_session`, the factory object will use the behavior described
235    by the script `script1`. When the `session_factory` is "instantiated" a
236    second time, the factory object will use the behavior described by the
237    script `script2`.
238    """
239
240    def __init__(self, *scripts):
241        ScriptedSession.reset_session_count()
242        self._scripts = iter(scripts)
243        self.scripted_sessions = []
244
245    def __call__(self, host, user, password):
246        """
247        Call the factory.
248
249        This is equivalent to the constructor of the session (e. g.
250        `ftplib.FTP` in a real application).
251        """
252        script = next(self._scripts)
253        scripted_session = ScriptedSession(script)
254        self.scripted_sessions.append(scripted_session)
255        return scripted_session
256
257
258factory = MultisessionFactory
259