1 #![deny(warnings)]
2
3 use futures::{FutureExt, SinkExt, StreamExt};
4 use warp::ws::Message;
5 use warp::Filter;
6
7 #[tokio::test]
upgrade()8 async fn upgrade() {
9 let _ = pretty_env_logger::try_init();
10
11 let route = warp::ws().map(|ws: warp::ws::Ws| ws.on_upgrade(|_| async {}));
12
13 // From https://tools.ietf.org/html/rfc6455#section-1.2
14 let key = "dGhlIHNhbXBsZSBub25jZQ==";
15 let accept = "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=";
16
17 let resp = warp::test::request()
18 .header("connection", "upgrade")
19 .header("upgrade", "websocket")
20 .header("sec-websocket-version", "13")
21 .header("sec-websocket-key", key)
22 .reply(&route)
23 .await;
24
25 assert_eq!(resp.status(), 101);
26 assert_eq!(resp.headers()["connection"], "upgrade");
27 assert_eq!(resp.headers()["upgrade"], "websocket");
28 assert_eq!(resp.headers()["sec-websocket-accept"], accept);
29
30 let resp = warp::test::request()
31 .header("connection", "keep-alive, Upgrade")
32 .header("upgrade", "Websocket")
33 .header("sec-websocket-version", "13")
34 .header("sec-websocket-key", key)
35 .reply(&route)
36 .await;
37
38 assert_eq!(resp.status(), 101);
39 }
40
41 #[tokio::test]
fail()42 async fn fail() {
43 let _ = pretty_env_logger::try_init();
44
45 let route = warp::any().map(warp::reply);
46
47 warp::test::ws()
48 .handshake(route)
49 .await
50 .expect_err("handshake non-websocket route should fail");
51 }
52
53 #[tokio::test]
text()54 async fn text() {
55 let _ = pretty_env_logger::try_init();
56
57 let mut client = warp::test::ws()
58 .handshake(ws_echo())
59 .await
60 .expect("handshake");
61
62 client.send_text("hello warp").await;
63
64 let msg = client.recv().await.expect("recv");
65 assert_eq!(msg.to_str(), Ok("hello warp"));
66 }
67
68 #[tokio::test]
binary()69 async fn binary() {
70 let _ = pretty_env_logger::try_init();
71
72 let mut client = warp::test::ws()
73 .handshake(ws_echo())
74 .await
75 .expect("handshake");
76
77 client.send(warp::ws::Message::binary(&b"bonk"[..])).await;
78 let msg = client.recv().await.expect("recv");
79 assert!(msg.is_binary());
80 assert_eq!(msg.as_bytes(), &b"bonk"[..]);
81 }
82
83 #[tokio::test]
send_ping()84 async fn send_ping() {
85 let _ = pretty_env_logger::try_init();
86
87 let filter = warp::ws().map(|ws: warp::ws::Ws| {
88 ws.on_upgrade(|mut websocket| {
89 async move {
90 websocket.send(Message::ping("srv")).await.unwrap();
91 // assume the client will pong back
92 let msg = websocket.next().await.expect("item").expect("ok");
93 assert!(msg.is_pong());
94 assert_eq!(msg.as_bytes(), &b"srv"[..]);
95 }
96 })
97 });
98
99 let mut client = warp::test::ws().handshake(filter).await.expect("handshake");
100
101 let msg = client.recv().await.expect("recv");
102 assert!(msg.is_ping());
103 assert_eq!(msg.as_bytes(), &b"srv"[..]);
104
105 client.recv_closed().await.expect("closed");
106 }
107
108 #[tokio::test]
echo_pings()109 async fn echo_pings() {
110 let _ = pretty_env_logger::try_init();
111
112 let mut client = warp::test::ws()
113 .handshake(ws_echo())
114 .await
115 .expect("handshake");
116
117 client.send(Message::ping("clt")).await;
118
119 // tungstenite sends the PONG first
120 let msg = client.recv().await.expect("recv");
121 assert!(msg.is_pong());
122 assert_eq!(msg.as_bytes(), &b"clt"[..]);
123
124 // and then `ws_echo` sends us back the same PING
125 let msg = client.recv().await.expect("recv");
126 assert!(msg.is_ping());
127 assert_eq!(msg.as_bytes(), &b"clt"[..]);
128
129 // and then our client would have sent *its* PONG
130 // and `ws_echo` would send *that* back too
131 let msg = client.recv().await.expect("recv");
132 assert!(msg.is_pong());
133 assert_eq!(msg.as_bytes(), &b"clt"[..]);
134 }
135
136 #[tokio::test]
closed()137 async fn closed() {
138 let _ = pretty_env_logger::try_init();
139
140 let route =
141 warp::ws().map(|ws: warp::ws::Ws| ws.on_upgrade(|websocket| websocket.close().map(|_| ())));
142
143 let mut client = warp::test::ws().handshake(route).await.expect("handshake");
144
145 client.recv_closed().await.expect("closed");
146 }
147
148 #[tokio::test]
limit_message_size()149 async fn limit_message_size() {
150 let _ = pretty_env_logger::try_init();
151
152 let echo = warp::ws().map(|ws: warp::ws::Ws| {
153 ws.max_message_size(1024).on_upgrade(|websocket| {
154 // Just echo all messages back...
155 let (tx, rx) = websocket.split();
156 rx.forward(tx).map(|result| {
157 assert!(result.is_err());
158 assert_eq!(
159 format!("{}", result.unwrap_err()).as_str(),
160 "Space limit exceeded: Message too big: 0 + 1025 > 1024"
161 );
162 })
163 })
164 });
165 let mut client = warp::test::ws().handshake(echo).await.expect("handshake");
166
167 client.send(warp::ws::Message::binary(vec![0; 1025])).await;
168 client.send_text("hello warp").await;
169 assert!(client.recv().await.is_err());
170 }
171
ws_echo() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Copy172 fn ws_echo() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Copy {
173 warp::ws().map(|ws: warp::ws::Ws| {
174 ws.on_upgrade(|websocket| {
175 // Just echo all messages back...
176 let (tx, rx) = websocket.split();
177 rx.inspect(|i| log::debug!("ws recv: {:?}", i))
178 .forward(tx)
179 .map(|_| ())
180 })
181 })
182 }
183