1#!/usr/bin/env python
2# -*- test-case-name: twisted.mail.test.test_pop3client -*-
3
4# Copyright (c) Twisted Matrix Laboratories.
5# See LICENSE for details.
6
7from twisted.internet.protocol import Factory
8from twisted.protocols import basic
9from twisted.internet import reactor
10import sys, time
11
12USER = "test"
13PASS = "twisted"
14
15PORT = 1100
16
17SSL_SUPPORT = True
18UIDL_SUPPORT = True
19INVALID_SERVER_RESPONSE = False
20INVALID_CAPABILITY_RESPONSE = False
21INVALID_LOGIN_RESPONSE = False
22DENY_CONNECTION = False
23DROP_CONNECTION = False
24BAD_TLS_RESPONSE = False
25TIMEOUT_RESPONSE = False
26TIMEOUT_DEFERRED = False
27SLOW_GREETING = False
28
29"""Commands"""
30CONNECTION_MADE = "+OK POP3 localhost v2003.83 server ready"
31
32CAPABILITIES = [
33"TOP",
34"LOGIN-DELAY 180",
35"USER",
36"SASL LOGIN"
37]
38
39CAPABILITIES_SSL = "STLS"
40CAPABILITIES_UIDL = "UIDL"
41
42
43INVALID_RESPONSE = "-ERR Unknown request"
44VALID_RESPONSE = "+OK Command Completed"
45AUTH_DECLINED = "-ERR LOGIN failed"
46AUTH_ACCEPTED = "+OK Mailbox open, 0 messages"
47TLS_ERROR = "-ERR server side error start TLS handshake"
48LOGOUT_COMPLETE = "+OK quit completed"
49NOT_LOGGED_IN = "-ERR Unknown AUHORIZATION state command"
50STAT = "+OK 0 0"
51UIDL = "+OK Unique-ID listing follows\r\n."
52LIST = "+OK Mailbox scan listing follows\r\n."
53CAP_START = "+OK Capability list follows:"
54
55
56class POP3TestServer(basic.LineReceiver):
57    def __init__(self, contextFactory = None):
58        self.loggedIn = False
59        self.caps = None
60        self.tmpUser = None
61        self.ctx = contextFactory
62
63    def sendSTATResp(self, req):
64        self.sendLine(STAT)
65
66    def sendUIDLResp(self, req):
67        self.sendLine(UIDL)
68
69    def sendLISTResp(self, req):
70        self.sendLine(LIST)
71
72    def sendCapabilities(self):
73        if self.caps is None:
74            self.caps = [CAP_START]
75
76        if UIDL_SUPPORT:
77            self.caps.append(CAPABILITIES_UIDL)
78
79        if SSL_SUPPORT:
80            self.caps.append(CAPABILITIES_SSL)
81
82        for cap in CAPABILITIES:
83            self.caps.append(cap)
84        resp = '\r\n'.join(self.caps)
85        resp += "\r\n."
86
87        self.sendLine(resp)
88
89
90    def connectionMade(self):
91        if DENY_CONNECTION:
92            self.disconnect()
93            return
94
95        if SLOW_GREETING:
96            reactor.callLater(20, self.sendGreeting)
97
98        else:
99            self.sendGreeting()
100
101    def sendGreeting(self):
102        self.sendLine(CONNECTION_MADE)
103
104    def lineReceived(self, line):
105        """Error Conditions"""
106
107        uline = line.upper()
108        find = lambda s: uline.find(s) != -1
109
110        if TIMEOUT_RESPONSE:
111            # Do not respond to clients request
112            return
113
114        if DROP_CONNECTION:
115            self.disconnect()
116            return
117
118        elif find("CAPA"):
119            if INVALID_CAPABILITY_RESPONSE:
120                self.sendLine(INVALID_RESPONSE)
121            else:
122                self.sendCapabilities()
123
124        elif find("STLS") and SSL_SUPPORT:
125            self.startTLS()
126
127        elif find("USER"):
128            if INVALID_LOGIN_RESPONSE:
129                self.sendLine(INVALID_RESPONSE)
130                return
131
132            resp = None
133            try:
134                self.tmpUser = line.split(" ")[1]
135                resp = VALID_RESPONSE
136            except:
137                resp = AUTH_DECLINED
138
139            self.sendLine(resp)
140
141        elif find("PASS"):
142            resp = None
143            try:
144                pwd = line.split(" ")[1]
145
146                if self.tmpUser is None or pwd is None:
147                    resp = AUTH_DECLINED
148                elif self.tmpUser == USER and pwd == PASS:
149                    resp = AUTH_ACCEPTED
150                    self.loggedIn = True
151                else:
152                    resp = AUTH_DECLINED
153            except:
154                resp = AUTH_DECLINED
155
156            self.sendLine(resp)
157
158        elif find("QUIT"):
159            self.loggedIn = False
160            self.sendLine(LOGOUT_COMPLETE)
161            self.disconnect()
162
163        elif INVALID_SERVER_RESPONSE:
164            self.sendLine(INVALID_RESPONSE)
165
166        elif not self.loggedIn:
167            self.sendLine(NOT_LOGGED_IN)
168
169        elif find("NOOP"):
170            self.sendLine(VALID_RESPONSE)
171
172        elif find("STAT"):
173            if TIMEOUT_DEFERRED:
174                return
175            self.sendLine(STAT)
176
177        elif find("LIST"):
178            if TIMEOUT_DEFERRED:
179                return
180            self.sendLine(LIST)
181
182        elif find("UIDL"):
183            if TIMEOUT_DEFERRED:
184                return
185            elif not UIDL_SUPPORT:
186                self.sendLine(INVALID_RESPONSE)
187                return
188
189            self.sendLine(UIDL)
190
191    def startTLS(self):
192        if self.ctx is None:
193            self.getContext()
194
195        if SSL_SUPPORT and self.ctx is not None:
196            self.sendLine('+OK Begin TLS negotiation now')
197            self.transport.startTLS(self.ctx)
198        else:
199            self.sendLine('-ERR TLS not available')
200
201    def disconnect(self):
202        self.transport.loseConnection()
203
204    def getContext(self):
205        try:
206            from twisted.internet import ssl
207        except ImportError:
208           self.ctx = None
209        else:
210            self.ctx = ssl.ClientContextFactory()
211            self.ctx.method = ssl.SSL.TLSv1_METHOD
212
213
214usage = """popServer.py [arg] (default is Standard POP Server with no messages)
215no_ssl  - Start with no SSL support
216no_uidl - Start with no UIDL support
217bad_resp - Send a non-RFC compliant response to the Client
218bad_cap_resp - send a non-RFC compliant response when the Client sends a 'CAPABILITY' request
219bad_login_resp - send a non-RFC compliant response when the Client sends a 'LOGIN' request
220deny - Deny the connection
221drop - Drop the connection after sending the greeting
222bad_tls - Send a bad response to a STARTTLS
223timeout - Do not return a response to a Client request
224to_deferred - Do not return a response on a 'Select' request. This
225              will test Deferred callback handling
226slow - Wait 20 seconds after the connection is made to return a Server Greeting
227"""
228
229def printMessage(msg):
230    print "Server Starting in %s mode" % msg
231
232def processArg(arg):
233
234    if arg.lower() == 'no_ssl':
235        global SSL_SUPPORT
236        SSL_SUPPORT = False
237        printMessage("NON-SSL")
238
239    elif arg.lower() == 'no_uidl':
240        global UIDL_SUPPORT
241        UIDL_SUPPORT = False
242        printMessage("NON-UIDL")
243
244    elif arg.lower() == 'bad_resp':
245        global INVALID_SERVER_RESPONSE
246        INVALID_SERVER_RESPONSE = True
247        printMessage("Invalid Server Response")
248
249    elif arg.lower() == 'bad_cap_resp':
250        global INVALID_CAPABILITY_RESPONSE
251        INVALID_CAPABILITY_RESPONSE = True
252        printMessage("Invalid Capability Response")
253
254    elif arg.lower() == 'bad_login_resp':
255        global INVALID_LOGIN_RESPONSE
256        INVALID_LOGIN_RESPONSE = True
257        printMessage("Invalid Capability Response")
258
259    elif arg.lower() == 'deny':
260        global DENY_CONNECTION
261        DENY_CONNECTION = True
262        printMessage("Deny Connection")
263
264    elif arg.lower() == 'drop':
265        global DROP_CONNECTION
266        DROP_CONNECTION = True
267        printMessage("Drop Connection")
268
269
270    elif arg.lower() == 'bad_tls':
271        global BAD_TLS_RESPONSE
272        BAD_TLS_RESPONSE = True
273        printMessage("Bad TLS Response")
274
275    elif arg.lower() == 'timeout':
276        global TIMEOUT_RESPONSE
277        TIMEOUT_RESPONSE = True
278        printMessage("Timeout Response")
279
280    elif arg.lower() == 'to_deferred':
281        global TIMEOUT_DEFERRED
282        TIMEOUT_DEFERRED = True
283        printMessage("Timeout Deferred Response")
284
285    elif arg.lower() == 'slow':
286        global SLOW_GREETING
287        SLOW_GREETING = True
288        printMessage("Slow Greeting")
289
290    elif arg.lower() == '--help':
291        print usage
292        sys.exit()
293
294    else:
295        print usage
296        sys.exit()
297
298def main():
299
300    if len(sys.argv) < 2:
301        printMessage("POP3 with no messages")
302    else:
303        args = sys.argv[1:]
304
305        for arg in args:
306            processArg(arg)
307
308    f = Factory()
309    f.protocol = POP3TestServer
310    reactor.listenTCP(PORT, f)
311    reactor.run()
312
313if __name__ == '__main__':
314    main()
315