1from __future__ import with_statement
2from rpyc import Service, async_
3from rpyc.utils.server import ThreadedServer
4from threading import RLock
5
6
7USERS_DB = {
8    "foo": "bar",
9    "spam": "bacon",
10    "eggs": "viking",
11}
12broadcast_lock = RLock()
13tokens = set()
14
15
16class UserToken(object):
17    def __init__(self, name, callback):
18        self.name = name
19        self.stale = False
20        self.callback = callback
21        self.broadcast("* Hello %s *" % (self.name,))
22        tokens.add(self)
23
24    def exposed_say(self, message):
25        if self.stale:
26            raise ValueError("User token is stale")
27        self.broadcast("[%s] %s" % (self.name, message))
28
29    def exposed_logout(self):
30        if self.stale:
31            return
32        self.stale = True
33        self.callback = None
34        tokens.discard(self)
35        self.broadcast("* Goodbye %s *" % (self.name,))
36
37    def broadcast(self, text):
38        global tokens
39        stale = set()
40        with broadcast_lock:
41            for tok in tokens:
42                try:
43                    tok.callback(text)
44                except Exception:
45                    stale.add(tok)
46            tokens -= stale
47
48
49class ChatService(Service):
50    def on_connect(self, conn):
51        self.token = None
52
53    def on_disconnect(self, conn):
54        if self.token:
55            self.token.exposed_logout()
56
57    def exposed_login(self, username, password, callback):
58        if self.token and not self.token.stale:
59            raise ValueError("already logged in")
60        if username in USERS_DB and password == USERS_DB[username]:
61            self.token = UserToken(username, async_(callback))
62            return self.token
63        else:
64            raise ValueError("wrong username or password")
65
66
67if __name__ == "__main__":
68    t = ThreadedServer(ChatService, port=19912)
69    t.start()
70