1 #![deny(warnings)]
2 extern crate futures;
3 extern crate pretty_env_logger;
4 extern crate warp;
5 
6 use std::collections::HashMap;
7 use std::sync::{
8     atomic::{AtomicUsize, Ordering},
9     Arc, Mutex,
10 };
11 
12 use futures::sync::mpsc;
13 use futures::{Future, Stream};
14 use warp::ws::{Message, WebSocket};
15 use warp::Filter;
16 
17 /// Our global unique user id counter.
18 static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
19 
20 /// Our state of currently connected users.
21 ///
22 /// - Key is their id
23 /// - Value is a sender of `warp::ws::Message`
24 type Users = Arc<Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
25 
main()26 fn main() {
27     pretty_env_logger::init();
28 
29     // Keep track of all connected users, key is usize, value
30     // is a websocket sender.
31     let users = Arc::new(Mutex::new(HashMap::new()));
32     // Turn our "state" into a new Filter...
33     let users = warp::any().map(move || users.clone());
34 
35     // GET /chat -> websocket upgrade
36     let chat = warp::path("chat")
37         // The `ws2()` filter will prepare Websocket handshake...
38         .and(warp::ws2())
39         .and(users)
40         .map(|ws: warp::ws::Ws2, users| {
41             // This will call our function if the handshake succeeds.
42             ws.on_upgrade(move |socket| user_connected(socket, users))
43         });
44 
45     // GET / -> index html
46     let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML));
47 
48     let routes = index.or(chat);
49 
50     warp::serve(routes).run(([127, 0, 0, 1], 3030));
51 }
52 
user_connected(ws: WebSocket, users: Users) -> impl Future<Item = (), Error = ()>53 fn user_connected(ws: WebSocket, users: Users) -> impl Future<Item = (), Error = ()> {
54     // Use a counter to assign a new unique ID for this user.
55     let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
56 
57     eprintln!("new chat user: {}", my_id);
58 
59     // Split the socket into a sender and receive of messages.
60     let (user_ws_tx, user_ws_rx) = ws.split();
61 
62     // Use an unbounded channel to handle buffering and flushing of messages
63     // to the websocket...
64     let (tx, rx) = mpsc::unbounded();
65     warp::spawn(
66         rx.map_err(|()| -> warp::Error { unreachable!("unbounded rx never errors") })
67             .forward(user_ws_tx)
68             .map(|_tx_rx| ())
69             .map_err(|ws_err| eprintln!("websocket send error: {}", ws_err)),
70     );
71 
72     // Save the sender in our list of connected users.
73     users.lock().unwrap().insert(my_id, tx);
74 
75     // Return a `Future` that is basically a state machine managing
76     // this specific user's connection.
77 
78     // Make an extra clone to give to our disconnection handler...
79     let users2 = users.clone();
80 
81     user_ws_rx
82         // Every time the user sends a message, broadcast it to
83         // all other users...
84         .for_each(move |msg| {
85             user_message(my_id, msg, &users);
86             Ok(())
87         })
88         // for_each will keep processing as long as the user stays
89         // connected. Once they disconnect, then...
90         .then(move |result| {
91             user_disconnected(my_id, &users2);
92             result
93         })
94         // If at any time, there was a websocket error, log here...
95         .map_err(move |e| {
96             eprintln!("websocket error(uid={}): {}", my_id, e);
97         })
98 }
99 
user_message(my_id: usize, msg: Message, users: &Users)100 fn user_message(my_id: usize, msg: Message, users: &Users) {
101     // Skip any non-Text messages...
102     let msg = if let Ok(s) = msg.to_str() {
103         s
104     } else {
105         return;
106     };
107 
108     let new_msg = format!("<User#{}>: {}", my_id, msg);
109 
110     // New message from this user, send it to everyone else (except same uid)...
111     //
112     // We use `retain` instead of a for loop so that we can reap any user that
113     // appears to have disconnected.
114     for (&uid, tx) in users.lock().unwrap().iter() {
115         if my_id != uid {
116             match tx.unbounded_send(Message::text(new_msg.clone())) {
117                 Ok(()) => (),
118                 Err(_disconnected) => {
119                     // The tx is disconnected, our `user_disconnected` code
120                     // should be happening in another task, nothing more to
121                     // do here.
122                 }
123             }
124         }
125     }
126 }
127 
user_disconnected(my_id: usize, users: &Users)128 fn user_disconnected(my_id: usize, users: &Users) {
129     eprintln!("good bye user: {}", my_id);
130 
131     // Stream closed up, so remove from the user list
132     users.lock().unwrap().remove(&my_id);
133 }
134 
135 static INDEX_HTML: &str = r#"
136 <!DOCTYPE html>
137 <html>
138     <head>
139         <title>Warp Chat</title>
140     </head>
141     <body>
142         <h1>warp chat</h1>
143         <div id="chat">
144             <p><em>Connecting...</em></p>
145         </div>
146         <input type="text" id="text" />
147         <button type="button" id="send">Send</button>
148         <script type="text/javascript">
149         var uri = 'ws://' + location.host + '/chat';
150         var ws = new WebSocket(uri);
151 
152         function message(data) {
153             var line = document.createElement('p');
154             line.innerText = data;
155             chat.appendChild(line);
156         }
157 
158         ws.onopen = function() {
159             chat.innerHTML = "<p><em>Connected!</em></p>";
160         }
161 
162         ws.onmessage = function(msg) {
163             message(msg.data);
164         };
165 
166         send.onclick = function() {
167             var msg = text.value;
168             ws.send(msg);
169             text.value = '';
170 
171             message('<You>: ' + msg);
172         };
173         </script>
174     </body>
175 </html>
176 "#;
177