1 #![deny(warnings)]
2 extern crate futures;
3 extern crate pretty_env_logger;
4 extern crate warp;
5
6 use std::collections::HashMap;
7 use std::sync::{
8 atomic::{AtomicUsize, Ordering},
9 Arc, Mutex,
10 };
11
12 use futures::sync::mpsc;
13 use futures::{Future, Stream};
14 use warp::ws::{Message, WebSocket};
15 use warp::Filter;
16
17 /// Our global unique user id counter.
18 static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
19
20 /// Our state of currently connected users.
21 ///
22 /// - Key is their id
23 /// - Value is a sender of `warp::ws::Message`
24 type Users = Arc<Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
25
main()26 fn main() {
27 pretty_env_logger::init();
28
29 // Keep track of all connected users, key is usize, value
30 // is a websocket sender.
31 let users = Arc::new(Mutex::new(HashMap::new()));
32 // Turn our "state" into a new Filter...
33 let users = warp::any().map(move || users.clone());
34
35 // GET /chat -> websocket upgrade
36 let chat = warp::path("chat")
37 // The `ws2()` filter will prepare Websocket handshake...
38 .and(warp::ws2())
39 .and(users)
40 .map(|ws: warp::ws::Ws2, users| {
41 // This will call our function if the handshake succeeds.
42 ws.on_upgrade(move |socket| user_connected(socket, users))
43 });
44
45 // GET / -> index html
46 let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML));
47
48 let routes = index.or(chat);
49
50 warp::serve(routes).run(([127, 0, 0, 1], 3030));
51 }
52
user_connected(ws: WebSocket, users: Users) -> impl Future<Item = (), Error = ()>53 fn user_connected(ws: WebSocket, users: Users) -> impl Future<Item = (), Error = ()> {
54 // Use a counter to assign a new unique ID for this user.
55 let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
56
57 eprintln!("new chat user: {}", my_id);
58
59 // Split the socket into a sender and receive of messages.
60 let (user_ws_tx, user_ws_rx) = ws.split();
61
62 // Use an unbounded channel to handle buffering and flushing of messages
63 // to the websocket...
64 let (tx, rx) = mpsc::unbounded();
65 warp::spawn(
66 rx.map_err(|()| -> warp::Error { unreachable!("unbounded rx never errors") })
67 .forward(user_ws_tx)
68 .map(|_tx_rx| ())
69 .map_err(|ws_err| eprintln!("websocket send error: {}", ws_err)),
70 );
71
72 // Save the sender in our list of connected users.
73 users.lock().unwrap().insert(my_id, tx);
74
75 // Return a `Future` that is basically a state machine managing
76 // this specific user's connection.
77
78 // Make an extra clone to give to our disconnection handler...
79 let users2 = users.clone();
80
81 user_ws_rx
82 // Every time the user sends a message, broadcast it to
83 // all other users...
84 .for_each(move |msg| {
85 user_message(my_id, msg, &users);
86 Ok(())
87 })
88 // for_each will keep processing as long as the user stays
89 // connected. Once they disconnect, then...
90 .then(move |result| {
91 user_disconnected(my_id, &users2);
92 result
93 })
94 // If at any time, there was a websocket error, log here...
95 .map_err(move |e| {
96 eprintln!("websocket error(uid={}): {}", my_id, e);
97 })
98 }
99
user_message(my_id: usize, msg: Message, users: &Users)100 fn user_message(my_id: usize, msg: Message, users: &Users) {
101 // Skip any non-Text messages...
102 let msg = if let Ok(s) = msg.to_str() {
103 s
104 } else {
105 return;
106 };
107
108 let new_msg = format!("<User#{}>: {}", my_id, msg);
109
110 // New message from this user, send it to everyone else (except same uid)...
111 //
112 // We use `retain` instead of a for loop so that we can reap any user that
113 // appears to have disconnected.
114 for (&uid, tx) in users.lock().unwrap().iter() {
115 if my_id != uid {
116 match tx.unbounded_send(Message::text(new_msg.clone())) {
117 Ok(()) => (),
118 Err(_disconnected) => {
119 // The tx is disconnected, our `user_disconnected` code
120 // should be happening in another task, nothing more to
121 // do here.
122 }
123 }
124 }
125 }
126 }
127
user_disconnected(my_id: usize, users: &Users)128 fn user_disconnected(my_id: usize, users: &Users) {
129 eprintln!("good bye user: {}", my_id);
130
131 // Stream closed up, so remove from the user list
132 users.lock().unwrap().remove(&my_id);
133 }
134
135 static INDEX_HTML: &str = r#"
136 <!DOCTYPE html>
137 <html>
138 <head>
139 <title>Warp Chat</title>
140 </head>
141 <body>
142 <h1>warp chat</h1>
143 <div id="chat">
144 <p><em>Connecting...</em></p>
145 </div>
146 <input type="text" id="text" />
147 <button type="button" id="send">Send</button>
148 <script type="text/javascript">
149 var uri = 'ws://' + location.host + '/chat';
150 var ws = new WebSocket(uri);
151
152 function message(data) {
153 var line = document.createElement('p');
154 line.innerText = data;
155 chat.appendChild(line);
156 }
157
158 ws.onopen = function() {
159 chat.innerHTML = "<p><em>Connected!</em></p>";
160 }
161
162 ws.onmessage = function(msg) {
163 message(msg.data);
164 };
165
166 send.onclick = function() {
167 var msg = text.value;
168 ws.send(msg);
169 text.value = '';
170
171 message('<You>: ' + msg);
172 };
173 </script>
174 </body>
175 </html>
176 "#;
177