1 use futures::{Stream, StreamExt};
2 use std::collections::HashMap;
3 use std::sync::{
4     atomic::{AtomicUsize, Ordering},
5     Arc, Mutex,
6 };
7 use tokio::sync::mpsc;
8 use warp::{sse::ServerSentEvent, Filter};
9 
10 #[tokio::main]
main()11 async fn main() {
12     pretty_env_logger::init();
13 
14     // Keep track of all connected users, key is usize, value
15     // is an event stream sender.
16     let users = Arc::new(Mutex::new(HashMap::new()));
17     // Turn our "state" into a new Filter...
18     let users = warp::any().map(move || users.clone());
19 
20     // POST /chat -> send message
21     let chat_send = warp::path("chat")
22         .and(warp::post())
23         .and(warp::path::param::<usize>())
24         .and(warp::body::content_length_limit(500))
25         .and(
26             warp::body::bytes().and_then(|body: bytes::Bytes| async move {
27                 std::str::from_utf8(&body)
28                     .map(String::from)
29                     .map_err(|_e| warp::reject::custom(NotUtf8))
30             }),
31         )
32         .and(users.clone())
33         .map(|my_id, msg, users| {
34             user_message(my_id, msg, &users);
35             warp::reply()
36         });
37 
38     // GET /chat -> messages stream
39     let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| {
40         // reply using server-sent events
41         let stream = user_connected(users);
42         warp::sse::reply(warp::sse::keep_alive().stream(stream))
43     });
44 
45     // GET / -> index html
46     let index = warp::path::end().map(|| {
47         warp::http::Response::builder()
48             .header("content-type", "text/html; charset=utf-8")
49             .body(INDEX_HTML)
50     });
51 
52     let routes = index.or(chat_recv).or(chat_send);
53 
54     warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
55 }
56 
57 /// Our global unique user id counter.
58 static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
59 
60 /// Message variants.
61 #[derive(Debug)]
62 enum Message {
63     UserId(usize),
64     Reply(String),
65 }
66 
67 #[derive(Debug)]
68 struct NotUtf8;
69 impl warp::reject::Reject for NotUtf8 {}
70 
71 /// Our state of currently connected users.
72 ///
73 /// - Key is their id
74 /// - Value is a sender of `Message`
75 type Users = Arc<Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
76 
user_connected( users: Users, ) -> impl Stream<Item = Result<impl ServerSentEvent + Send + 'static, warp::Error>> + Send + 'static77 fn user_connected(
78     users: Users,
79 ) -> impl Stream<Item = Result<impl ServerSentEvent + Send + 'static, warp::Error>> + Send + 'static
80 {
81     // Use a counter to assign a new unique ID for this user.
82     let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
83 
84     eprintln!("new chat user: {}", my_id);
85 
86     // Use an unbounded channel to handle buffering and flushing of messages
87     // to the event source...
88     let (tx, rx) = mpsc::unbounded_channel();
89 
90     tx.send(Message::UserId(my_id))
91         // rx is right above, so this cannot fail
92         .unwrap();
93 
94     // Save the sender in our list of connected users.
95     users.lock().unwrap().insert(my_id, tx);
96 
97     // Convert messages into Server-Sent Events and return resulting stream.
98     rx.map(|msg| match msg {
99         Message::UserId(my_id) => Ok((warp::sse::event("user"), warp::sse::data(my_id)).into_a()),
100         Message::Reply(reply) => Ok(warp::sse::data(reply).into_b()),
101     })
102 }
103 
user_message(my_id: usize, msg: String, users: &Users)104 fn user_message(my_id: usize, msg: String, users: &Users) {
105     let new_msg = format!("<User#{}>: {}", my_id, msg);
106 
107     // New message from this user, send it to everyone else (except same uid)...
108     //
109     // We use `retain` instead of a for loop so that we can reap any user that
110     // appears to have disconnected.
111     users.lock().unwrap().retain(|uid, tx| {
112         if my_id == *uid {
113             // don't send to same user, but do retain
114             true
115         } else {
116             // If not `is_ok`, the SSE stream is gone, and so don't retain
117             tx.send(Message::Reply(new_msg.clone())).is_ok()
118         }
119     });
120 }
121 
122 static INDEX_HTML: &str = r#"
123 <!DOCTYPE html>
124 <html>
125     <head>
126         <title>Warp Chat</title>
127     </head>
128     <body>
129         <h1>warp chat</h1>
130         <div id="chat">
131             <p><em>Connecting...</em></p>
132         </div>
133         <input type="text" id="text" />
134         <button type="button" id="send">Send</button>
135         <script type="text/javascript">
136         var uri = 'http://' + location.host + '/chat';
137         var sse = new EventSource(uri);
138         function message(data) {
139             var line = document.createElement('p');
140             line.innerText = data;
141             chat.appendChild(line);
142         }
143         sse.onopen = function() {
144             chat.innerHTML = "<p><em>Connected!</em></p>";
145         }
146         var user_id;
147         sse.addEventListener("user", function(msg) {
148             user_id = msg.data;
149         });
150         sse.onmessage = function(msg) {
151             message(msg.data);
152         };
153         send.onclick = function() {
154             var msg = text.value;
155             var xhr = new XMLHttpRequest();
156             xhr.open("POST", uri + '/' + user_id, true);
157             xhr.send(msg);
158             text.value = '';
159             message('<You>: ' + msg);
160         };
161         </script>
162     </body>
163 </html>
164 "#;
165