1#!/usr/bin/env python
2#!/Python27/python
3
4""" -*- Mode: Python; tab-width: 4 -*-
5    Author: Kristina Simpson (Krista^)
6
7    Run as follows.
8    Standalone mode: python lobby-server.py [<[-p|--port=] port number>] <game server url>
9    Application server mode: twistd -y lobby-server.py [<[-p|--port=] port number>] <game server url>
10"""
11from twisted.internet import reactor
12from twisted.web import server, resource, http
13from twisted.web.server import Site, NOT_DONE_YET
14from random import randint
15from pprint import pprint
16from optparse import OptionParser, OptionValueError
17from time import strftime, time, localtime, sleep
18from threading import Thread, RLock
19import httplib, json
20from urlparse import urlparse
21import socket
22from copy import deepcopy
23from collections import deque
24from random import choice
25import logging
26
27# Request timeout in seconds
28REQUEST_TIMEOUT = 240
29# Session expiry, in seconds
30SESSION_TIMEOUT = REQUEST_TIMEOUT + 30
31# Time after which we remove a session, in seconds.
32SESSION_REMOVE_TIMEOUT = 3600
33
34# Create a logger
35logger = logging.getLogger('lobby-server')
36logger.setLevel(logging.DEBUG)
37ch = logging.StreamHandler()
38ch.setLevel(logging.DEBUG)
39ch.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
40logger.addHandler(ch)
41
42class QueueManager:
43    def __init__(self):
44        self._queue = {}
45        self._queue_lock = RLock()
46    def get_queue(self, name):
47        with self._queue_lock:
48            if not self._queue.has_key(name):
49                logger.debug('Creating new queue for %s' % name)
50                queue = {'clients':{'session_id':-1, 'name':name, 'ip_chain':None}, 'messages':{}, 'requests':[], 'timeouts':{}, 'lock':RLock()}
51                self._queue[name] = queue
52            return self._queue[name]
53    def has_key(self, name):
54        with self._queue_lock:
55            return self._queue.has_key(name)
56    def remove_queue(self, q):
57        with self._queue_lock:
58            name = q['clients']['name']
59            logger.debug('Removing session for %s(%d)' % (name, q['clients']['session_id']))
60            assert self._queue.has_key(name), 'Attempted delete of non-key %s' % name
61            del self._queue[name]
62
63queue_manager = QueueManager()
64
65def make_session_id(): return randint(1, 2**31)
66
67class GameServerStatusQueryThread(Thread):
68    def __init__(self, game_server_url):
69        Thread.__init__(self)
70        self._last_seen = 0
71        self._headers = {"Content-type":"application/json", "Accept":"text/json, application/json"}
72        self._abort = False
73        o = urlparse(game_server_url)
74        self._game_server = o.netloc
75        self._game_server_path = o.path
76        self._stats_lock = RLock()
77        self._game_stats = {'total_games_being_played': 0, 'played':{}}
78    def run(self):
79        running = True
80        while running:
81            if self._abort: running = False
82            try:
83                conn = httplib.HTTPConnection(self._game_server, timeout = 30)
84                params = json.dumps({'type': "get_status", 'last_seen': self._last_seen})
85                conn.request("POST", self._game_server_path, params, self._headers)
86                response = conn.getresponse()
87                #print response.status, response.reason
88                if response.status == 200:
89                    data = json.loads(response.read())
90                    self._last_seen = data['status_id']
91                    if data.has_key('games'):
92                        self.parse_game_data(data['games'])
93                    else:
94                        print "Status packet received without 'games' data"
95                conn.close()
96                if not self._abort: sleep(10)
97            except socket.timeout:
98                print 'Timeout waiting for current status from game server'
99            except Exception, e:
100                print e
101                sleep(10)
102    def abort(self):
103        self._abort = True
104        self.join()
105    def parse_game_data(self, games):
106        """ Parses the game data returned from the server, data consists of a lists of items
107            such as:
108            {u'id': 1327454801, u'started': True, u'type': u'game_info'}
109            type is always game_info (currently)
110            id is always the id returned when the game was created.
111            started is either True or False
112        """
113        # todo: go through the lobby_games_list and any games that aren't in the list from the game
114        # server need to be removed and the references to them in the lobby_sessions need to be
115        # removed.
116        total_games_being_played = 0
117        played = {}
118    def get_game_stats(self):
119        with self._stats_lock:
120            game_stats = deepcopy(self._game_stats)
121        return game_stats
122    def ajax_post(self, msg):
123        completion = False
124        data = None
125        conn = httplib.HTTPConnection(self._game_server, timeout = 30)
126        params = json.dumps(msg)
127        try:
128            conn.request("POST", self._game_server_path, params, self._headers)
129            response = conn.getresponse()
130            #print response.status, response.reason
131            if response.status == 200:
132                data = json.loads(response.read())
133                completion = True
134            else:
135                data = {'error':('%d: %s' % ( response.status, response.reason ))}
136            conn.close()
137        except socket.timeout:
138            print 'Timeout waiting for message response from game server'
139            return False, {'error': 'Timeout waiting for message response from game server'}
140        except Exception, e:
141            return False, {'error': str(e)}
142        return completion, data
143
144class LobbyHandler(resource.Resource):
145    isLeaf = True
146    def __init__(self, gsq):
147        resource.Resource.__init__(self)
148        self._gsq = gsq
149        self._requests = []
150        self._messages = {}
151    def cleanup(self):
152        pass
153    def _delayedRender(self, q):
154        with q['lock']:
155            if q['timeouts'].has_key('delay'):
156                del q['timeouts']['delay']
157        request.write(json.dumps({'request':'success', 'type': 'status', 'status_id':0, messages:[]}))
158        request.finish()
159    def _responseFailed(self, err, q):
160        logger.error('_responseFailed for %s(%d) -- %s' % (q['clients']['name'], q['clients']['session_id'], err))
161        with q['lock']:
162            q['timeouts']['delay'].cancel()
163            del q['timeouts']['delay']
164    def render_GET(self, request):
165        return self.render_POST(request)
166    def render_POST(self, request):
167        #pprint(request)
168        #pprint(request.headers)
169        #pprint(request.content.getvalue())
170        req_data = json.loads(request.content.getvalue())
171        request.setHeader('content-type', 'application/json')
172        request.setHeader('Access-Control-Allow-Origin','*')
173        headers = request.requestHeaders
174        ip_chain = headers.getRawHeaders('x-forwarded-for')
175        #pprint(req_data)
176        logger.debug(req_data)
177
178        if req_data.has_key('type'):
179            type = req_data['type']
180        else:
181            req_data['request'] = 'failed'
182            req_data['error'] = 'bad_type'
183            return json.dumps(req_data)
184        if req_data.has_key('args'):
185            args = req_data['args']
186        else:
187            args = {}
188
189        if req_data.has_key('username'):
190            username = req_data['username']
191        else:
192            req_data['request'] = 'failed'
193            req_data['error'] = 'No username present.'
194            logger.error('Rejected request, no username')
195            return json.dumps(req_data)
196
197        if req_data.has_key('session_id'):
198            session_id = req_data['session_id']
199        else:
200            req_data['request'] = 'failed'
201            req_data['error'] = 'No session_id present.'
202            logger.error('Rejected request, no session_id')
203            return json.dumps(req_data)
204
205        # Catch all
206        resp = req_data
207        req_data['request'] = 'failed'
208        resp['error'] = 'Unknown "type" of command'
209
210        if type == 'login':
211            resp = self.do_login(username, session_id, req_data['password'], ip_chain)
212        elif type == 'get_status':
213            if req_data.has_key('last_seen'):
214                last_seen = req_data['last_seen']
215            else:
216                last_seen = 0
217            return self.get_status(username, session_id, last_seen, request)
218
219        #self._requests.append(request)
220        #call = reactor.callLater(10, self._delayedRender, request)
221        #request.notifyFinish().addErrback(self._responseFailed, call)
222        #return NOT_DONE_YET
223        return json.dumps(resp)
224    def do_login(self, username, session, password, ip_chain):
225        # passwords ignored for now, i.e. all logins are guest logins.
226        # first look for an existing session from same ip address
227        q = queue_manager.get_queue(username)
228        with q['lock']:
229            if session == -1:
230                if q['clients']['session_id'] == -1:
231                    # New session
232                    q['clients']['session_id'] = make_session_id()
233                    q['clients']['ip_chain'] = ip_chain
234                    q['timeouts']['client'] = reactor.callLater(SESSION_TIMEOUT, self.session_timeout, q)
235                    logger.debug('New session %s(%d)' % (q['clients']['name'], q['clients']['session_id']))
236                    return {'request':'success', 'session_id':q['clients']['session_id'], 'type':'login'}
237                # Old session exists check IP's match
238                if q['clients']['ip_chain'] == ip_chain:
239                    if q['timeouts'].has_key('client'):
240                        q['timeouts']['client'].reset(SESSION_TIMEOUT)
241                    else:
242                        q['timeouts']['client'] = reactor.callLater(SESSION_TIMEOUT, self.session_timeout, q)
243                    logger.debug('Existing Session %s(%d)' % (q['clients']['name'], q['clients']['session_id']))
244                    return {'request':'success', 'session_id':q['clients']['session_id'], 'type':'login'}
245                logger.error('Reject session IP\'s differ %s(%d)' % (q['clients']['name'], q['clients']['session_id']))
246                return { 'request': 'failed', 'error':'User already logged in from different IP', 'type':'login' }
247            # We've been given a session id, so possibly old session.
248            if q['clients']['session_id'] == -1:
249                # looks like session has expired, we'll let them keep there session_id
250                q['clients']['session_id'] = session
251                q['clients']['ip_chain'] = ip_chain
252                if q['timeouts'].has_key('client'):
253                    q['timeouts']['client'].reset(SESSION_TIMEOUT)
254                else:
255                    q['timeouts']['client'] = reactor.callLater(SESSION_TIMEOUT, self.session_timeout, q)
256                logger.debug('Refresh expired session %s(%d)' % (q['clients']['name'], q['clients']['session_id']))
257                return {'request':'success', 'session_id':q['clients']['session_id'], 'type':'login'}
258            if q['clients']['session_id'] == session:
259                # Same session, just validate
260                q['clients']['ip_chain'] = ip_chain
261                if q['timeouts'].has_key('client'):
262                    q['timeouts']['client'].reset(SESSION_TIMEOUT)
263                else:
264                    q['timeouts']['client'] = reactor.callLater(SESSION_TIMEOUT, self.session_timeout, q)
265                logger.debug('Existing Session %s(%d)' % (q['clients']['name'], q['clients']['session_id']))
266                return {'request':'success', 'session_id':q['clients']['session_id'], 'type':'login'}
267            # Sessions don't match, check ips.
268            if q['clients']['ip_chain'] == ip_chain:
269                if q['timeouts'].has_key('client'):
270                    q['timeouts']['client'].reset(SESSION_TIMEOUT)
271                else:
272                    q['timeouts']['client'] = reactor.callLater(SESSION_TIMEOUT, self.session_timeout, q)
273                logger.debug('Existing session (id\'s differ ip\'s same) %s(%d)' % (q['clients']['name'], q['clients']['session_id']))
274                return {'request':'success', 'session_id':q['clients']['session_id'], 'type':'login'}
275            # IP check failed
276            logger.error('Reject session IP\'s differ %s(%d)' % (q['clients']['name'], q['clients']['session_id']))
277            return { 'request': 'failed', 'error':'User already logged in from different IP', 'type':'login' }
278    def session_timeout(self, q):
279        with q['lock']:
280            logger.debug('Session timeout %s(%d)' % (q['clients']['name'], q['clients']['session_id']))
281            q['clients']['session_id'] = -1
282            q['timeouts']['client'] = reactor.callLater(SESSION_REMOVE_TIMEOUT, self.session_remove, q)
283    def session_remove(self, q):
284        logger.debug('Session remove %s(%d)' % (q['clients']['name'], q['clients']['session_id']))
285        queue_manager.remove_queue(q)
286    def get_status(self, username, session_id, last_seen, request):
287        if not queue_manager.has_key(username):
288            logger.debug('Reject get_status -- user not logged in %s(%d)' % (username, session_id))
289            return json.dumps({ 'request':'failed', 'error':'No session exists for %s. Please login first.' % username })
290        q = queue_manager.get_queue(username)
291        if q['clients']['session_id'] != session_id:
292            logger.debug('Reject get_status session id\'s differ %s(%d) -- %d' % (q['clients']['name'], q['clients']['session_id'], session_id))
293            return json.dumps({ 'request':'failed', 'error':'Non-matching session_id'})
294        msgs = q['messages']
295        max_msg = 0
296        msg_list = []
297        for k,v in msgs.iteritems():
298            if last_seen < k:
299                max_msg = max(max_msg,k)
300                msg_list.append(v)
301        q['timeouts']['client'].reset(SESSION_TIMEOUT)
302        if q['timeouts'].has_key('delay'):
303            q['timeouts']['delay'].cancel()
304            del q['timeouts']['delay']
305        if len(msg_list) == 0:
306            # delay till later and try again
307            q['requests'].append(request)
308            q['timeouts']['delay'] = reactor.callLater(REQUEST_TIMEOUT, self._delayedRender, q)
309            request.notifyFinish().addErrback(self._responseFailed, q)
310            logger.debug('Deferred get_status response %s(%d)' % (q['clients']['name'], q['clients']['session_id']))
311            return NOT_DONE_YET
312        # Some messages so just send them.
313        logger.debug('get_status returns %d messages %s(%d)' % (len(msg_list), q['clients']['name'], q['clients']['session_id']))
314        return json.dumps({ 'request':'success', 'type':'status', 'status_id':max_msg, 'messages':msg_list })
315
316def main(options, args):
317    if len(args) > 0:
318        gsq = GameServerStatusQueryThread(args[0])
319        gsq.start()
320    else:
321        gsq = None
322    root = resource.Resource()
323    lobby_handler = LobbyHandler(gsq)
324    factory = Site(lobby_handler)
325    reactor.listenTCP(options.port, factory)
326    reactor.run()
327    lobby_handler.cleanup()
328    if gsq: gsq.abort()
329
330def get_opts():
331    usage = 'usage: %prog <options> <game_server:port>'
332    parser = OptionParser(usage)
333    parser.add_option('-v', '--verbose', action='store_true', default=False, dest='verbose')
334    parser.add_option('-p', '--port', action='store', default=8181, dest='port')
335    return parser.parse_args()
336
337if  __name__ == '__main__':
338    # Running standalone mode not as a service.
339    options, args = get_opts()
340    main(options, args)
341else:
342    # run under twistd using -y
343    options, args = get_opts()
344    gsq = GameServerStatusQueryThread(args[0])
345    gsq.start()
346    root = Resource()
347    root.putChild('', FormPage())
348    application = Application('Lobby Service')
349    TCPServer(options.port, Site(root)).setServiceParent(application)
350