1from __future__ import with_statement 2from rpyc import Service, async_ 3from rpyc.utils.server import ThreadedServer 4from threading import RLock 5 6 7USERS_DB = { 8 "foo": "bar", 9 "spam": "bacon", 10 "eggs": "viking", 11} 12broadcast_lock = RLock() 13tokens = set() 14 15 16class UserToken(object): 17 def __init__(self, name, callback): 18 self.name = name 19 self.stale = False 20 self.callback = callback 21 self.broadcast("* Hello %s *" % (self.name,)) 22 tokens.add(self) 23 24 def exposed_say(self, message): 25 if self.stale: 26 raise ValueError("User token is stale") 27 self.broadcast("[%s] %s" % (self.name, message)) 28 29 def exposed_logout(self): 30 if self.stale: 31 return 32 self.stale = True 33 self.callback = None 34 tokens.discard(self) 35 self.broadcast("* Goodbye %s *" % (self.name,)) 36 37 def broadcast(self, text): 38 global tokens 39 stale = set() 40 with broadcast_lock: 41 for tok in tokens: 42 try: 43 tok.callback(text) 44 except Exception: 45 stale.add(tok) 46 tokens -= stale 47 48 49class ChatService(Service): 50 def on_connect(self, conn): 51 self.token = None 52 53 def on_disconnect(self, conn): 54 if self.token: 55 self.token.exposed_logout() 56 57 def exposed_login(self, username, password, callback): 58 if self.token and not self.token.stale: 59 raise ValueError("already logged in") 60 if username in USERS_DB and password == USERS_DB[username]: 61 self.token = UserToken(username, async_(callback)) 62 return self.token 63 else: 64 raise ValueError("wrong username or password") 65 66 67if __name__ == "__main__": 68 t = ThreadedServer(ChatService, port=19912) 69 t.start() 70