1# Copyright (C) 2018-2021, Stefan Schwarzer <sschwarzer@sschwarzer.net> 2# and ftputil contributors (see `doc/contributors.txt`) 3# See the file LICENSE for licensing terms. 4 5import sys 6import unittest.mock 7 8import ftputil.path_encoding 9 10 11__all__ = ["Call", "factory"] 12 13 14class Call: 15 def __init__(self, method_name, *, args=None, kwargs=None, result=None): 16 self.method_name = method_name 17 self.result = result 18 self.args = args 19 self.kwargs = kwargs 20 21 def __repr__(self): 22 return ( 23 "{0.__class__.__name__}(" 24 "method_name={0.method_name!r}, " 25 "result={0.result!r}, " 26 "args={0.args!r}, " 27 "kwargs={0.kwargs!r})".format(self) 28 ) 29 30 def check_call(self, method_name, args=None, kwargs=None): 31 # TODO: Mention printing in the docstring. 32 # TODO: Describe how the comparison is made. 33 """ 34 Check the method name, args and kwargs from this `Call` object against 35 the method name, args and kwargs from the system under test. 36 37 Raise an `AssertionError` if there's a mismatch. 38 """ 39 print( 40 " Call from session script: {} | {!r} | {!r}".format( 41 self.method_name, self.args, self.kwargs 42 ) 43 ) 44 print( 45 " Call from system under test: {} | {!r} | {!r}".format( 46 method_name, args, kwargs 47 ) 48 ) 49 50 def compare(value_name, script_value, sut_value): 51 if script_value is not None: 52 try: 53 assert script_value == sut_value 54 except AssertionError: 55 print( 56 " Mismatch for `{}`: {!r} != {!r}".format( 57 value_name, script_value, sut_value 58 ) 59 ) 60 raise 61 62 compare("method_name", self.method_name, method_name) 63 compare("args", self.args, args) 64 compare("kwargs", self.kwargs, kwargs) 65 66 @staticmethod 67 def _is_exception_class(obj): 68 """ 69 Return `True` if `obj` is an exception class, else `False`. 70 """ 71 try: 72 return issubclass(obj, Exception) 73 except TypeError: 74 # TypeError: issubclass() arg 1 must be a class 75 return False 76 77 def __call__(self): 78 """ 79 Simulate call, returning the result or raising the exception. 80 """ 81 if isinstance(self.result, Exception) or self._is_exception_class(self.result): 82 raise self.result 83 else: 84 return self.result 85 86 87class ScriptedSession: 88 """ 89 "Scripted" `ftplib.FTP`-like class for testing. 90 91 To avoid actual input/output over sockets or files, specify the values that 92 should be returned by the class's methods. 93 94 The class is instantiated with a `script` argument. This is a list of 95 `Call` objects where each object specifies the name of the `ftplib.FTP` 96 method that is expected to be called and what the method should return. If 97 the value is an exception, it will be raised, not returned. 98 99 In case the method returns a socket (like `transfercmd`), the return value 100 to be specified in the `Call` instance is the content of the underlying 101 socket file. 102 103 The advantage of the approach of this class over the use of 104 `unittest.mock.Mock` objects is that the sequence of calls is clearly 105 visible. With `Mock` objects, the developer must keep in mind all the calls 106 when specifying return values or side effects for the mock methods. 107 """ 108 109 # Class-level counter to enumerate `ScriptedSession`s. This makes it 110 # possible to make the output even more compact. Additionally, it's easier 111 # to distinguish numbers like 1, 2, etc. than hexadecimal ids. 112 _session_count = 0 113 114 encoding = ftputil.path_encoding.FTPLIB_DEFAULT_ENCODING 115 116 @classmethod 117 def reset_session_count(cls): 118 cls._session_count = 0 119 120 def __init__(self, script): 121 self.script = script 122 # `File.close` accesses the session `sock` object to set and reset the 123 # timeout. `sock` itself is never _called_ though, so it doesn't make 124 # sense to create a `sock` _call_. 125 self.sock = unittest.mock.Mock(name="socket_attribute") 126 # Index into `script`, the list of `Call` objects 127 self._call_index = 0 128 self.__class__._session_count += 1 129 self._session_count = self.__class__._session_count 130 # Always expect an entry for the constructor. 131 init_call = self._next_script_call("__init__") 132 # The constructor isn't supposed to return anything. The only reason to 133 # call it here is to raise an exception if that was specified in the 134 # `script`. 135 init_call() 136 137 def __str__(self): 138 return "{} {}".format(self.__class__.__name__, self._session_count) 139 140 def _next_script_call(self, requested_attribute): 141 """ 142 Return next `Call` object. 143 """ 144 print(self, "in `_next_script_call`") 145 try: 146 call = self.script[self._call_index] 147 except IndexError: 148 print(" *** Ran out of `Call` objects for this session {!r}".format(self)) 149 print(" Requested attribute was {!r}".format(requested_attribute)) 150 raise 151 self._call_index += 1 152 print(self, f"next call: {call!r}") 153 return call 154 155 def __getattr__(self, attribute_name): 156 script_call = self._next_script_call(attribute_name) 157 158 def dummy_method(*args, **kwargs): 159 print(self, "in `__getattr__`") 160 script_call.check_call(attribute_name, args, kwargs) 161 return script_call() 162 163 return dummy_method 164 165 # ---------------------------------------------------------------------- 166 # `ftplib.FTP` methods that shouldn't be executed with the default 167 # processing in `__getattr__` 168 169 def dir(self, path, callback): 170 """ 171 Call the `callback` for each line in the multiline string 172 `call.result`. 173 """ 174 script_call = self._next_script_call("dir") 175 # Check only the path. This requires that the corresponding `Call` 176 # object also solely specifies the path as `args`. 177 script_call.check_call("dir", (path,), None) 178 # Give `dir` the chance to raise an exception if one was specified in 179 # the `Call`'s `result` argument. 180 call_result = script_call() 181 for line in call_result.splitlines(): 182 callback(line) 183 184 def ntransfercmd(self, cmd, rest=None): 185 """ 186 Simulate the `ftplib.FTP.ntransfercmd` call. 187 188 `ntransfercmd` returns a tuple of a socket and a size argument. The 189 `result` value given when constructing an `ntransfercmd` call specifies 190 an `io.TextIO` or `io.BytesIO` value to be used as the 191 `Socket.makefile` result. 192 """ 193 script_call = self._next_script_call("ntransfercmd") 194 script_call.check_call("ntransfercmd", (cmd, rest), None) 195 # Give `ntransfercmd` the chance to raise an exception if one was 196 # specified in the `Call`'s `result` argument. 197 call_result = script_call() 198 mock_socket = unittest.mock.Mock(name="socket") 199 mock_socket.makefile.return_value = call_result 200 # Return `None` for size. The docstring of `ftplib.FTP.ntransfercmd` 201 # says that's a possibility. 202 # TODO: Use a sensible `size` value later if it turns out we need it. 203 return mock_socket, None 204 205 def transfercmd(self, cmd, rest=None): 206 """ 207 Simulate the `ftplib.FTP.transfercmd` call. 208 209 `transfercmd` returns a socket. The `result` value given when 210 constructing an `transfercmd` call specifies an `io.TextIO` or 211 `io.BytesIO` value to be used as the `Socket.makefile` result. 212 """ 213 script_call = self._next_script_call("transfercmd") 214 script_call.check_call("transfercmd", (cmd, rest), None) 215 # Give `transfercmd` the chance to raise an exception if one was 216 # specified in the `Call`'s `result` argument. 217 call_result = script_call() 218 mock_socket = unittest.mock.Mock(name="socket") 219 mock_socket.makefile.return_value = call_result 220 return mock_socket 221 222 223class MultisessionFactory: 224 """ 225 Return a session factory using the scripted data from the given "scripts" 226 for each consecutive call ("creation") of a factory. 227 228 Example: 229 230 host = ftputil.FTPHost(host, user, password, 231 session_factory=scripted_session.factory(script1, script2)) 232 233 When the `session_factory` is "instantiated" for the first time by 234 `FTPHost._make_session`, the factory object will use the behavior described 235 by the script `script1`. When the `session_factory` is "instantiated" a 236 second time, the factory object will use the behavior described by the 237 script `script2`. 238 """ 239 240 def __init__(self, *scripts): 241 ScriptedSession.reset_session_count() 242 self._scripts = iter(scripts) 243 self.scripted_sessions = [] 244 245 def __call__(self, host, user, password): 246 """ 247 Call the factory. 248 249 This is equivalent to the constructor of the session (e. g. 250 `ftplib.FTP` in a real application). 251 """ 252 script = next(self._scripts) 253 scripted_session = ScriptedSession(script) 254 self.scripted_sessions.append(scripted_session) 255 return scripted_session 256 257 258factory = MultisessionFactory 259