1 #![deny(warnings)]
2 
3 use futures::{FutureExt, SinkExt, StreamExt};
4 use warp::ws::Message;
5 use warp::Filter;
6 
7 #[tokio::test]
upgrade()8 async fn upgrade() {
9     let _ = pretty_env_logger::try_init();
10 
11     let route = warp::ws().map(|ws: warp::ws::Ws| ws.on_upgrade(|_| async {}));
12 
13     // From https://tools.ietf.org/html/rfc6455#section-1.2
14     let key = "dGhlIHNhbXBsZSBub25jZQ==";
15     let accept = "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=";
16 
17     let resp = warp::test::request()
18         .header("connection", "upgrade")
19         .header("upgrade", "websocket")
20         .header("sec-websocket-version", "13")
21         .header("sec-websocket-key", key)
22         .reply(&route)
23         .await;
24 
25     assert_eq!(resp.status(), 101);
26     assert_eq!(resp.headers()["connection"], "upgrade");
27     assert_eq!(resp.headers()["upgrade"], "websocket");
28     assert_eq!(resp.headers()["sec-websocket-accept"], accept);
29 
30     let resp = warp::test::request()
31         .header("connection", "keep-alive, Upgrade")
32         .header("upgrade", "Websocket")
33         .header("sec-websocket-version", "13")
34         .header("sec-websocket-key", key)
35         .reply(&route)
36         .await;
37 
38     assert_eq!(resp.status(), 101);
39 }
40 
41 #[tokio::test]
fail()42 async fn fail() {
43     let _ = pretty_env_logger::try_init();
44 
45     let route = warp::any().map(warp::reply);
46 
47     warp::test::ws()
48         .handshake(route)
49         .await
50         .expect_err("handshake non-websocket route should fail");
51 }
52 
53 #[tokio::test]
text()54 async fn text() {
55     let _ = pretty_env_logger::try_init();
56 
57     let mut client = warp::test::ws()
58         .handshake(ws_echo())
59         .await
60         .expect("handshake");
61 
62     client.send_text("hello warp").await;
63 
64     let msg = client.recv().await.expect("recv");
65     assert_eq!(msg.to_str(), Ok("hello warp"));
66 }
67 
68 #[tokio::test]
binary()69 async fn binary() {
70     let _ = pretty_env_logger::try_init();
71 
72     let mut client = warp::test::ws()
73         .handshake(ws_echo())
74         .await
75         .expect("handshake");
76 
77     client.send(warp::ws::Message::binary(&b"bonk"[..])).await;
78     let msg = client.recv().await.expect("recv");
79     assert!(msg.is_binary());
80     assert_eq!(msg.as_bytes(), &b"bonk"[..]);
81 }
82 
83 #[tokio::test]
send_ping()84 async fn send_ping() {
85     let _ = pretty_env_logger::try_init();
86 
87     let filter = warp::ws().map(|ws: warp::ws::Ws| {
88         ws.on_upgrade(|mut websocket| {
89             async move {
90                 websocket.send(Message::ping("srv")).await.unwrap();
91                 // assume the client will pong back
92                 let msg = websocket.next().await.expect("item").expect("ok");
93                 assert!(msg.is_pong());
94                 assert_eq!(msg.as_bytes(), &b"srv"[..]);
95             }
96         })
97     });
98 
99     let mut client = warp::test::ws().handshake(filter).await.expect("handshake");
100 
101     let msg = client.recv().await.expect("recv");
102     assert!(msg.is_ping());
103     assert_eq!(msg.as_bytes(), &b"srv"[..]);
104 
105     client.recv_closed().await.expect("closed");
106 }
107 
108 #[tokio::test]
echo_pings()109 async fn echo_pings() {
110     let _ = pretty_env_logger::try_init();
111 
112     let mut client = warp::test::ws()
113         .handshake(ws_echo())
114         .await
115         .expect("handshake");
116 
117     client.send(Message::ping("clt")).await;
118 
119     // tungstenite sends the PONG first
120     let msg = client.recv().await.expect("recv");
121     assert!(msg.is_pong());
122     assert_eq!(msg.as_bytes(), &b"clt"[..]);
123 
124     // and then `ws_echo` sends us back the same PING
125     let msg = client.recv().await.expect("recv");
126     assert!(msg.is_ping());
127     assert_eq!(msg.as_bytes(), &b"clt"[..]);
128 
129     // and then our client would have sent *its* PONG
130     // and `ws_echo` would send *that* back too
131     let msg = client.recv().await.expect("recv");
132     assert!(msg.is_pong());
133     assert_eq!(msg.as_bytes(), &b"clt"[..]);
134 }
135 
136 #[tokio::test]
closed()137 async fn closed() {
138     let _ = pretty_env_logger::try_init();
139 
140     let route =
141         warp::ws().map(|ws: warp::ws::Ws| ws.on_upgrade(|websocket| websocket.close().map(|_| ())));
142 
143     let mut client = warp::test::ws().handshake(route).await.expect("handshake");
144 
145     client.recv_closed().await.expect("closed");
146 }
147 
148 #[tokio::test]
limit_message_size()149 async fn limit_message_size() {
150     let _ = pretty_env_logger::try_init();
151 
152     let echo = warp::ws().map(|ws: warp::ws::Ws| {
153         ws.max_message_size(1024).on_upgrade(|websocket| {
154             // Just echo all messages back...
155             let (tx, rx) = websocket.split();
156             rx.forward(tx).map(|result| {
157                 assert!(result.is_err());
158                 assert_eq!(
159                     format!("{}", result.unwrap_err()).as_str(),
160                     "Space limit exceeded: Message too big: 0 + 1025 > 1024"
161                 );
162             })
163         })
164     });
165     let mut client = warp::test::ws().handshake(echo).await.expect("handshake");
166 
167     client.send(warp::ws::Message::binary(vec![0; 1025])).await;
168     client.send_text("hello warp").await;
169     assert!(client.recv().await.is_err());
170 }
171 
ws_echo() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Copy172 fn ws_echo() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Copy {
173     warp::ws().map(|ws: warp::ws::Ws| {
174         ws.on_upgrade(|websocket| {
175             // Just echo all messages back...
176             let (tx, rx) = websocket.split();
177             rx.inspect(|i| log::debug!("ws recv: {:?}", i))
178                 .forward(tx)
179                 .map(|_| ())
180         })
181     })
182 }
183