1#!/usr/bin/env python 2# -*- test-case-name: twisted.mail.test.test_pop3client -*- 3 4# Copyright (c) Twisted Matrix Laboratories. 5# See LICENSE for details. 6 7from twisted.internet.protocol import Factory 8from twisted.protocols import basic 9from twisted.internet import reactor 10import sys, time 11 12USER = "test" 13PASS = "twisted" 14 15PORT = 1100 16 17SSL_SUPPORT = True 18UIDL_SUPPORT = True 19INVALID_SERVER_RESPONSE = False 20INVALID_CAPABILITY_RESPONSE = False 21INVALID_LOGIN_RESPONSE = False 22DENY_CONNECTION = False 23DROP_CONNECTION = False 24BAD_TLS_RESPONSE = False 25TIMEOUT_RESPONSE = False 26TIMEOUT_DEFERRED = False 27SLOW_GREETING = False 28 29"""Commands""" 30CONNECTION_MADE = "+OK POP3 localhost v2003.83 server ready" 31 32CAPABILITIES = [ 33"TOP", 34"LOGIN-DELAY 180", 35"USER", 36"SASL LOGIN" 37] 38 39CAPABILITIES_SSL = "STLS" 40CAPABILITIES_UIDL = "UIDL" 41 42 43INVALID_RESPONSE = "-ERR Unknown request" 44VALID_RESPONSE = "+OK Command Completed" 45AUTH_DECLINED = "-ERR LOGIN failed" 46AUTH_ACCEPTED = "+OK Mailbox open, 0 messages" 47TLS_ERROR = "-ERR server side error start TLS handshake" 48LOGOUT_COMPLETE = "+OK quit completed" 49NOT_LOGGED_IN = "-ERR Unknown AUHORIZATION state command" 50STAT = "+OK 0 0" 51UIDL = "+OK Unique-ID listing follows\r\n." 52LIST = "+OK Mailbox scan listing follows\r\n." 53CAP_START = "+OK Capability list follows:" 54 55 56class POP3TestServer(basic.LineReceiver): 57 def __init__(self, contextFactory = None): 58 self.loggedIn = False 59 self.caps = None 60 self.tmpUser = None 61 self.ctx = contextFactory 62 63 def sendSTATResp(self, req): 64 self.sendLine(STAT) 65 66 def sendUIDLResp(self, req): 67 self.sendLine(UIDL) 68 69 def sendLISTResp(self, req): 70 self.sendLine(LIST) 71 72 def sendCapabilities(self): 73 if self.caps is None: 74 self.caps = [CAP_START] 75 76 if UIDL_SUPPORT: 77 self.caps.append(CAPABILITIES_UIDL) 78 79 if SSL_SUPPORT: 80 self.caps.append(CAPABILITIES_SSL) 81 82 for cap in CAPABILITIES: 83 self.caps.append(cap) 84 resp = '\r\n'.join(self.caps) 85 resp += "\r\n." 86 87 self.sendLine(resp) 88 89 90 def connectionMade(self): 91 if DENY_CONNECTION: 92 self.disconnect() 93 return 94 95 if SLOW_GREETING: 96 reactor.callLater(20, self.sendGreeting) 97 98 else: 99 self.sendGreeting() 100 101 def sendGreeting(self): 102 self.sendLine(CONNECTION_MADE) 103 104 def lineReceived(self, line): 105 """Error Conditions""" 106 107 uline = line.upper() 108 find = lambda s: uline.find(s) != -1 109 110 if TIMEOUT_RESPONSE: 111 # Do not respond to clients request 112 return 113 114 if DROP_CONNECTION: 115 self.disconnect() 116 return 117 118 elif find("CAPA"): 119 if INVALID_CAPABILITY_RESPONSE: 120 self.sendLine(INVALID_RESPONSE) 121 else: 122 self.sendCapabilities() 123 124 elif find("STLS") and SSL_SUPPORT: 125 self.startTLS() 126 127 elif find("USER"): 128 if INVALID_LOGIN_RESPONSE: 129 self.sendLine(INVALID_RESPONSE) 130 return 131 132 resp = None 133 try: 134 self.tmpUser = line.split(" ")[1] 135 resp = VALID_RESPONSE 136 except: 137 resp = AUTH_DECLINED 138 139 self.sendLine(resp) 140 141 elif find("PASS"): 142 resp = None 143 try: 144 pwd = line.split(" ")[1] 145 146 if self.tmpUser is None or pwd is None: 147 resp = AUTH_DECLINED 148 elif self.tmpUser == USER and pwd == PASS: 149 resp = AUTH_ACCEPTED 150 self.loggedIn = True 151 else: 152 resp = AUTH_DECLINED 153 except: 154 resp = AUTH_DECLINED 155 156 self.sendLine(resp) 157 158 elif find("QUIT"): 159 self.loggedIn = False 160 self.sendLine(LOGOUT_COMPLETE) 161 self.disconnect() 162 163 elif INVALID_SERVER_RESPONSE: 164 self.sendLine(INVALID_RESPONSE) 165 166 elif not self.loggedIn: 167 self.sendLine(NOT_LOGGED_IN) 168 169 elif find("NOOP"): 170 self.sendLine(VALID_RESPONSE) 171 172 elif find("STAT"): 173 if TIMEOUT_DEFERRED: 174 return 175 self.sendLine(STAT) 176 177 elif find("LIST"): 178 if TIMEOUT_DEFERRED: 179 return 180 self.sendLine(LIST) 181 182 elif find("UIDL"): 183 if TIMEOUT_DEFERRED: 184 return 185 elif not UIDL_SUPPORT: 186 self.sendLine(INVALID_RESPONSE) 187 return 188 189 self.sendLine(UIDL) 190 191 def startTLS(self): 192 if self.ctx is None: 193 self.getContext() 194 195 if SSL_SUPPORT and self.ctx is not None: 196 self.sendLine('+OK Begin TLS negotiation now') 197 self.transport.startTLS(self.ctx) 198 else: 199 self.sendLine('-ERR TLS not available') 200 201 def disconnect(self): 202 self.transport.loseConnection() 203 204 def getContext(self): 205 try: 206 from twisted.internet import ssl 207 except ImportError: 208 self.ctx = None 209 else: 210 self.ctx = ssl.ClientContextFactory() 211 self.ctx.method = ssl.SSL.TLSv1_METHOD 212 213 214usage = """popServer.py [arg] (default is Standard POP Server with no messages) 215no_ssl - Start with no SSL support 216no_uidl - Start with no UIDL support 217bad_resp - Send a non-RFC compliant response to the Client 218bad_cap_resp - send a non-RFC compliant response when the Client sends a 'CAPABILITY' request 219bad_login_resp - send a non-RFC compliant response when the Client sends a 'LOGIN' request 220deny - Deny the connection 221drop - Drop the connection after sending the greeting 222bad_tls - Send a bad response to a STARTTLS 223timeout - Do not return a response to a Client request 224to_deferred - Do not return a response on a 'Select' request. This 225 will test Deferred callback handling 226slow - Wait 20 seconds after the connection is made to return a Server Greeting 227""" 228 229def printMessage(msg): 230 print "Server Starting in %s mode" % msg 231 232def processArg(arg): 233 234 if arg.lower() == 'no_ssl': 235 global SSL_SUPPORT 236 SSL_SUPPORT = False 237 printMessage("NON-SSL") 238 239 elif arg.lower() == 'no_uidl': 240 global UIDL_SUPPORT 241 UIDL_SUPPORT = False 242 printMessage("NON-UIDL") 243 244 elif arg.lower() == 'bad_resp': 245 global INVALID_SERVER_RESPONSE 246 INVALID_SERVER_RESPONSE = True 247 printMessage("Invalid Server Response") 248 249 elif arg.lower() == 'bad_cap_resp': 250 global INVALID_CAPABILITY_RESPONSE 251 INVALID_CAPABILITY_RESPONSE = True 252 printMessage("Invalid Capability Response") 253 254 elif arg.lower() == 'bad_login_resp': 255 global INVALID_LOGIN_RESPONSE 256 INVALID_LOGIN_RESPONSE = True 257 printMessage("Invalid Capability Response") 258 259 elif arg.lower() == 'deny': 260 global DENY_CONNECTION 261 DENY_CONNECTION = True 262 printMessage("Deny Connection") 263 264 elif arg.lower() == 'drop': 265 global DROP_CONNECTION 266 DROP_CONNECTION = True 267 printMessage("Drop Connection") 268 269 270 elif arg.lower() == 'bad_tls': 271 global BAD_TLS_RESPONSE 272 BAD_TLS_RESPONSE = True 273 printMessage("Bad TLS Response") 274 275 elif arg.lower() == 'timeout': 276 global TIMEOUT_RESPONSE 277 TIMEOUT_RESPONSE = True 278 printMessage("Timeout Response") 279 280 elif arg.lower() == 'to_deferred': 281 global TIMEOUT_DEFERRED 282 TIMEOUT_DEFERRED = True 283 printMessage("Timeout Deferred Response") 284 285 elif arg.lower() == 'slow': 286 global SLOW_GREETING 287 SLOW_GREETING = True 288 printMessage("Slow Greeting") 289 290 elif arg.lower() == '--help': 291 print usage 292 sys.exit() 293 294 else: 295 print usage 296 sys.exit() 297 298def main(): 299 300 if len(sys.argv) < 2: 301 printMessage("POP3 with no messages") 302 else: 303 args = sys.argv[1:] 304 305 for arg in args: 306 processArg(arg) 307 308 f = Factory() 309 f.protocol = POP3TestServer 310 reactor.listenTCP(PORT, f) 311 reactor.run() 312 313if __name__ == '__main__': 314 main() 315