1#!/usr/bin/env python 2#!/Python27/python 3 4""" -*- Mode: Python; tab-width: 4 -*- 5 Author: Kristina Simpson (Krista^) 6 7 Run as follows. 8 Standalone mode: python lobby-server.py [<[-p|--port=] port number>] <game server url> 9 Application server mode: twistd -y lobby-server.py [<[-p|--port=] port number>] <game server url> 10""" 11from twisted.internet import reactor 12from twisted.web import server, resource, http 13from twisted.web.server import Site, NOT_DONE_YET 14from random import randint 15from pprint import pprint 16from optparse import OptionParser, OptionValueError 17from time import strftime, time, localtime, sleep 18from threading import Thread, RLock 19import httplib, json 20from urlparse import urlparse 21import socket 22from copy import deepcopy 23from collections import deque 24from random import choice 25import logging 26 27# Request timeout in seconds 28REQUEST_TIMEOUT = 240 29# Session expiry, in seconds 30SESSION_TIMEOUT = REQUEST_TIMEOUT + 30 31# Time after which we remove a session, in seconds. 32SESSION_REMOVE_TIMEOUT = 3600 33 34# Create a logger 35logger = logging.getLogger('lobby-server') 36logger.setLevel(logging.DEBUG) 37ch = logging.StreamHandler() 38ch.setLevel(logging.DEBUG) 39ch.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) 40logger.addHandler(ch) 41 42class QueueManager: 43 def __init__(self): 44 self._queue = {} 45 self._queue_lock = RLock() 46 def get_queue(self, name): 47 with self._queue_lock: 48 if not self._queue.has_key(name): 49 logger.debug('Creating new queue for %s' % name) 50 queue = {'clients':{'session_id':-1, 'name':name, 'ip_chain':None}, 'messages':{}, 'requests':[], 'timeouts':{}, 'lock':RLock()} 51 self._queue[name] = queue 52 return self._queue[name] 53 def has_key(self, name): 54 with self._queue_lock: 55 return self._queue.has_key(name) 56 def remove_queue(self, q): 57 with self._queue_lock: 58 name = q['clients']['name'] 59 logger.debug('Removing session for %s(%d)' % (name, q['clients']['session_id'])) 60 assert self._queue.has_key(name), 'Attempted delete of non-key %s' % name 61 del self._queue[name] 62 63queue_manager = QueueManager() 64 65def make_session_id(): return randint(1, 2**31) 66 67class GameServerStatusQueryThread(Thread): 68 def __init__(self, game_server_url): 69 Thread.__init__(self) 70 self._last_seen = 0 71 self._headers = {"Content-type":"application/json", "Accept":"text/json, application/json"} 72 self._abort = False 73 o = urlparse(game_server_url) 74 self._game_server = o.netloc 75 self._game_server_path = o.path 76 self._stats_lock = RLock() 77 self._game_stats = {'total_games_being_played': 0, 'played':{}} 78 def run(self): 79 running = True 80 while running: 81 if self._abort: running = False 82 try: 83 conn = httplib.HTTPConnection(self._game_server, timeout = 30) 84 params = json.dumps({'type': "get_status", 'last_seen': self._last_seen}) 85 conn.request("POST", self._game_server_path, params, self._headers) 86 response = conn.getresponse() 87 #print response.status, response.reason 88 if response.status == 200: 89 data = json.loads(response.read()) 90 self._last_seen = data['status_id'] 91 if data.has_key('games'): 92 self.parse_game_data(data['games']) 93 else: 94 print "Status packet received without 'games' data" 95 conn.close() 96 if not self._abort: sleep(10) 97 except socket.timeout: 98 print 'Timeout waiting for current status from game server' 99 except Exception, e: 100 print e 101 sleep(10) 102 def abort(self): 103 self._abort = True 104 self.join() 105 def parse_game_data(self, games): 106 """ Parses the game data returned from the server, data consists of a lists of items 107 such as: 108 {u'id': 1327454801, u'started': True, u'type': u'game_info'} 109 type is always game_info (currently) 110 id is always the id returned when the game was created. 111 started is either True or False 112 """ 113 # todo: go through the lobby_games_list and any games that aren't in the list from the game 114 # server need to be removed and the references to them in the lobby_sessions need to be 115 # removed. 116 total_games_being_played = 0 117 played = {} 118 def get_game_stats(self): 119 with self._stats_lock: 120 game_stats = deepcopy(self._game_stats) 121 return game_stats 122 def ajax_post(self, msg): 123 completion = False 124 data = None 125 conn = httplib.HTTPConnection(self._game_server, timeout = 30) 126 params = json.dumps(msg) 127 try: 128 conn.request("POST", self._game_server_path, params, self._headers) 129 response = conn.getresponse() 130 #print response.status, response.reason 131 if response.status == 200: 132 data = json.loads(response.read()) 133 completion = True 134 else: 135 data = {'error':('%d: %s' % ( response.status, response.reason ))} 136 conn.close() 137 except socket.timeout: 138 print 'Timeout waiting for message response from game server' 139 return False, {'error': 'Timeout waiting for message response from game server'} 140 except Exception, e: 141 return False, {'error': str(e)} 142 return completion, data 143 144class LobbyHandler(resource.Resource): 145 isLeaf = True 146 def __init__(self, gsq): 147 resource.Resource.__init__(self) 148 self._gsq = gsq 149 self._requests = [] 150 self._messages = {} 151 def cleanup(self): 152 pass 153 def _delayedRender(self, q): 154 with q['lock']: 155 if q['timeouts'].has_key('delay'): 156 del q['timeouts']['delay'] 157 request.write(json.dumps({'request':'success', 'type': 'status', 'status_id':0, messages:[]})) 158 request.finish() 159 def _responseFailed(self, err, q): 160 logger.error('_responseFailed for %s(%d) -- %s' % (q['clients']['name'], q['clients']['session_id'], err)) 161 with q['lock']: 162 q['timeouts']['delay'].cancel() 163 del q['timeouts']['delay'] 164 def render_GET(self, request): 165 return self.render_POST(request) 166 def render_POST(self, request): 167 #pprint(request) 168 #pprint(request.headers) 169 #pprint(request.content.getvalue()) 170 req_data = json.loads(request.content.getvalue()) 171 request.setHeader('content-type', 'application/json') 172 request.setHeader('Access-Control-Allow-Origin','*') 173 headers = request.requestHeaders 174 ip_chain = headers.getRawHeaders('x-forwarded-for') 175 #pprint(req_data) 176 logger.debug(req_data) 177 178 if req_data.has_key('type'): 179 type = req_data['type'] 180 else: 181 req_data['request'] = 'failed' 182 req_data['error'] = 'bad_type' 183 return json.dumps(req_data) 184 if req_data.has_key('args'): 185 args = req_data['args'] 186 else: 187 args = {} 188 189 if req_data.has_key('username'): 190 username = req_data['username'] 191 else: 192 req_data['request'] = 'failed' 193 req_data['error'] = 'No username present.' 194 logger.error('Rejected request, no username') 195 return json.dumps(req_data) 196 197 if req_data.has_key('session_id'): 198 session_id = req_data['session_id'] 199 else: 200 req_data['request'] = 'failed' 201 req_data['error'] = 'No session_id present.' 202 logger.error('Rejected request, no session_id') 203 return json.dumps(req_data) 204 205 # Catch all 206 resp = req_data 207 req_data['request'] = 'failed' 208 resp['error'] = 'Unknown "type" of command' 209 210 if type == 'login': 211 resp = self.do_login(username, session_id, req_data['password'], ip_chain) 212 elif type == 'get_status': 213 if req_data.has_key('last_seen'): 214 last_seen = req_data['last_seen'] 215 else: 216 last_seen = 0 217 return self.get_status(username, session_id, last_seen, request) 218 219 #self._requests.append(request) 220 #call = reactor.callLater(10, self._delayedRender, request) 221 #request.notifyFinish().addErrback(self._responseFailed, call) 222 #return NOT_DONE_YET 223 return json.dumps(resp) 224 def do_login(self, username, session, password, ip_chain): 225 # passwords ignored for now, i.e. all logins are guest logins. 226 # first look for an existing session from same ip address 227 q = queue_manager.get_queue(username) 228 with q['lock']: 229 if session == -1: 230 if q['clients']['session_id'] == -1: 231 # New session 232 q['clients']['session_id'] = make_session_id() 233 q['clients']['ip_chain'] = ip_chain 234 q['timeouts']['client'] = reactor.callLater(SESSION_TIMEOUT, self.session_timeout, q) 235 logger.debug('New session %s(%d)' % (q['clients']['name'], q['clients']['session_id'])) 236 return {'request':'success', 'session_id':q['clients']['session_id'], 'type':'login'} 237 # Old session exists check IP's match 238 if q['clients']['ip_chain'] == ip_chain: 239 if q['timeouts'].has_key('client'): 240 q['timeouts']['client'].reset(SESSION_TIMEOUT) 241 else: 242 q['timeouts']['client'] = reactor.callLater(SESSION_TIMEOUT, self.session_timeout, q) 243 logger.debug('Existing Session %s(%d)' % (q['clients']['name'], q['clients']['session_id'])) 244 return {'request':'success', 'session_id':q['clients']['session_id'], 'type':'login'} 245 logger.error('Reject session IP\'s differ %s(%d)' % (q['clients']['name'], q['clients']['session_id'])) 246 return { 'request': 'failed', 'error':'User already logged in from different IP', 'type':'login' } 247 # We've been given a session id, so possibly old session. 248 if q['clients']['session_id'] == -1: 249 # looks like session has expired, we'll let them keep there session_id 250 q['clients']['session_id'] = session 251 q['clients']['ip_chain'] = ip_chain 252 if q['timeouts'].has_key('client'): 253 q['timeouts']['client'].reset(SESSION_TIMEOUT) 254 else: 255 q['timeouts']['client'] = reactor.callLater(SESSION_TIMEOUT, self.session_timeout, q) 256 logger.debug('Refresh expired session %s(%d)' % (q['clients']['name'], q['clients']['session_id'])) 257 return {'request':'success', 'session_id':q['clients']['session_id'], 'type':'login'} 258 if q['clients']['session_id'] == session: 259 # Same session, just validate 260 q['clients']['ip_chain'] = ip_chain 261 if q['timeouts'].has_key('client'): 262 q['timeouts']['client'].reset(SESSION_TIMEOUT) 263 else: 264 q['timeouts']['client'] = reactor.callLater(SESSION_TIMEOUT, self.session_timeout, q) 265 logger.debug('Existing Session %s(%d)' % (q['clients']['name'], q['clients']['session_id'])) 266 return {'request':'success', 'session_id':q['clients']['session_id'], 'type':'login'} 267 # Sessions don't match, check ips. 268 if q['clients']['ip_chain'] == ip_chain: 269 if q['timeouts'].has_key('client'): 270 q['timeouts']['client'].reset(SESSION_TIMEOUT) 271 else: 272 q['timeouts']['client'] = reactor.callLater(SESSION_TIMEOUT, self.session_timeout, q) 273 logger.debug('Existing session (id\'s differ ip\'s same) %s(%d)' % (q['clients']['name'], q['clients']['session_id'])) 274 return {'request':'success', 'session_id':q['clients']['session_id'], 'type':'login'} 275 # IP check failed 276 logger.error('Reject session IP\'s differ %s(%d)' % (q['clients']['name'], q['clients']['session_id'])) 277 return { 'request': 'failed', 'error':'User already logged in from different IP', 'type':'login' } 278 def session_timeout(self, q): 279 with q['lock']: 280 logger.debug('Session timeout %s(%d)' % (q['clients']['name'], q['clients']['session_id'])) 281 q['clients']['session_id'] = -1 282 q['timeouts']['client'] = reactor.callLater(SESSION_REMOVE_TIMEOUT, self.session_remove, q) 283 def session_remove(self, q): 284 logger.debug('Session remove %s(%d)' % (q['clients']['name'], q['clients']['session_id'])) 285 queue_manager.remove_queue(q) 286 def get_status(self, username, session_id, last_seen, request): 287 if not queue_manager.has_key(username): 288 logger.debug('Reject get_status -- user not logged in %s(%d)' % (username, session_id)) 289 return json.dumps({ 'request':'failed', 'error':'No session exists for %s. Please login first.' % username }) 290 q = queue_manager.get_queue(username) 291 if q['clients']['session_id'] != session_id: 292 logger.debug('Reject get_status session id\'s differ %s(%d) -- %d' % (q['clients']['name'], q['clients']['session_id'], session_id)) 293 return json.dumps({ 'request':'failed', 'error':'Non-matching session_id'}) 294 msgs = q['messages'] 295 max_msg = 0 296 msg_list = [] 297 for k,v in msgs.iteritems(): 298 if last_seen < k: 299 max_msg = max(max_msg,k) 300 msg_list.append(v) 301 q['timeouts']['client'].reset(SESSION_TIMEOUT) 302 if q['timeouts'].has_key('delay'): 303 q['timeouts']['delay'].cancel() 304 del q['timeouts']['delay'] 305 if len(msg_list) == 0: 306 # delay till later and try again 307 q['requests'].append(request) 308 q['timeouts']['delay'] = reactor.callLater(REQUEST_TIMEOUT, self._delayedRender, q) 309 request.notifyFinish().addErrback(self._responseFailed, q) 310 logger.debug('Deferred get_status response %s(%d)' % (q['clients']['name'], q['clients']['session_id'])) 311 return NOT_DONE_YET 312 # Some messages so just send them. 313 logger.debug('get_status returns %d messages %s(%d)' % (len(msg_list), q['clients']['name'], q['clients']['session_id'])) 314 return json.dumps({ 'request':'success', 'type':'status', 'status_id':max_msg, 'messages':msg_list }) 315 316def main(options, args): 317 if len(args) > 0: 318 gsq = GameServerStatusQueryThread(args[0]) 319 gsq.start() 320 else: 321 gsq = None 322 root = resource.Resource() 323 lobby_handler = LobbyHandler(gsq) 324 factory = Site(lobby_handler) 325 reactor.listenTCP(options.port, factory) 326 reactor.run() 327 lobby_handler.cleanup() 328 if gsq: gsq.abort() 329 330def get_opts(): 331 usage = 'usage: %prog <options> <game_server:port>' 332 parser = OptionParser(usage) 333 parser.add_option('-v', '--verbose', action='store_true', default=False, dest='verbose') 334 parser.add_option('-p', '--port', action='store', default=8181, dest='port') 335 return parser.parse_args() 336 337if __name__ == '__main__': 338 # Running standalone mode not as a service. 339 options, args = get_opts() 340 main(options, args) 341else: 342 # run under twistd using -y 343 options, args = get_opts() 344 gsq = GameServerStatusQueryThread(args[0]) 345 gsq.start() 346 root = Resource() 347 root.putChild('', FormPage()) 348 application = Application('Lobby Service') 349 TCPServer(options.port, Site(root)).setServiceParent(application) 350