1 use futures::{Stream, StreamExt};
2 use std::collections::HashMap;
3 use std::sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc, Mutex,
6 };
7 use tokio::sync::mpsc;
8 use warp::{sse::ServerSentEvent, Filter};
9
10 #[tokio::main]
main()11 async fn main() {
12 pretty_env_logger::init();
13
14 // Keep track of all connected users, key is usize, value
15 // is an event stream sender.
16 let users = Arc::new(Mutex::new(HashMap::new()));
17 // Turn our "state" into a new Filter...
18 let users = warp::any().map(move || users.clone());
19
20 // POST /chat -> send message
21 let chat_send = warp::path("chat")
22 .and(warp::post())
23 .and(warp::path::param::<usize>())
24 .and(warp::body::content_length_limit(500))
25 .and(
26 warp::body::bytes().and_then(|body: bytes::Bytes| async move {
27 std::str::from_utf8(&body)
28 .map(String::from)
29 .map_err(|_e| warp::reject::custom(NotUtf8))
30 }),
31 )
32 .and(users.clone())
33 .map(|my_id, msg, users| {
34 user_message(my_id, msg, &users);
35 warp::reply()
36 });
37
38 // GET /chat -> messages stream
39 let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| {
40 // reply using server-sent events
41 let stream = user_connected(users);
42 warp::sse::reply(warp::sse::keep_alive().stream(stream))
43 });
44
45 // GET / -> index html
46 let index = warp::path::end().map(|| {
47 warp::http::Response::builder()
48 .header("content-type", "text/html; charset=utf-8")
49 .body(INDEX_HTML)
50 });
51
52 let routes = index.or(chat_recv).or(chat_send);
53
54 warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
55 }
56
57 /// Our global unique user id counter.
58 static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
59
60 /// Message variants.
61 #[derive(Debug)]
62 enum Message {
63 UserId(usize),
64 Reply(String),
65 }
66
67 #[derive(Debug)]
68 struct NotUtf8;
69 impl warp::reject::Reject for NotUtf8 {}
70
71 /// Our state of currently connected users.
72 ///
73 /// - Key is their id
74 /// - Value is a sender of `Message`
75 type Users = Arc<Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
76
user_connected( users: Users, ) -> impl Stream<Item = Result<impl ServerSentEvent + Send + 'static, warp::Error>> + Send + 'static77 fn user_connected(
78 users: Users,
79 ) -> impl Stream<Item = Result<impl ServerSentEvent + Send + 'static, warp::Error>> + Send + 'static
80 {
81 // Use a counter to assign a new unique ID for this user.
82 let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
83
84 eprintln!("new chat user: {}", my_id);
85
86 // Use an unbounded channel to handle buffering and flushing of messages
87 // to the event source...
88 let (tx, rx) = mpsc::unbounded_channel();
89
90 tx.send(Message::UserId(my_id))
91 // rx is right above, so this cannot fail
92 .unwrap();
93
94 // Save the sender in our list of connected users.
95 users.lock().unwrap().insert(my_id, tx);
96
97 // Convert messages into Server-Sent Events and return resulting stream.
98 rx.map(|msg| match msg {
99 Message::UserId(my_id) => Ok((warp::sse::event("user"), warp::sse::data(my_id)).into_a()),
100 Message::Reply(reply) => Ok(warp::sse::data(reply).into_b()),
101 })
102 }
103
user_message(my_id: usize, msg: String, users: &Users)104 fn user_message(my_id: usize, msg: String, users: &Users) {
105 let new_msg = format!("<User#{}>: {}", my_id, msg);
106
107 // New message from this user, send it to everyone else (except same uid)...
108 //
109 // We use `retain` instead of a for loop so that we can reap any user that
110 // appears to have disconnected.
111 users.lock().unwrap().retain(|uid, tx| {
112 if my_id == *uid {
113 // don't send to same user, but do retain
114 true
115 } else {
116 // If not `is_ok`, the SSE stream is gone, and so don't retain
117 tx.send(Message::Reply(new_msg.clone())).is_ok()
118 }
119 });
120 }
121
122 static INDEX_HTML: &str = r#"
123 <!DOCTYPE html>
124 <html>
125 <head>
126 <title>Warp Chat</title>
127 </head>
128 <body>
129 <h1>warp chat</h1>
130 <div id="chat">
131 <p><em>Connecting...</em></p>
132 </div>
133 <input type="text" id="text" />
134 <button type="button" id="send">Send</button>
135 <script type="text/javascript">
136 var uri = 'http://' + location.host + '/chat';
137 var sse = new EventSource(uri);
138 function message(data) {
139 var line = document.createElement('p');
140 line.innerText = data;
141 chat.appendChild(line);
142 }
143 sse.onopen = function() {
144 chat.innerHTML = "<p><em>Connected!</em></p>";
145 }
146 var user_id;
147 sse.addEventListener("user", function(msg) {
148 user_id = msg.data;
149 });
150 sse.onmessage = function(msg) {
151 message(msg.data);
152 };
153 send.onclick = function() {
154 var msg = text.value;
155 var xhr = new XMLHttpRequest();
156 xhr.open("POST", uri + '/' + user_id, true);
157 xhr.send(msg);
158 text.value = '';
159 message('<You>: ' + msg);
160 };
161 </script>
162 </body>
163 </html>
164 "#;
165