1 use futures::{future, prelude::*};
2 use redis::{aio::MultiplexedConnection, cmd, AsyncCommands, ErrorKind, RedisResult};
3
4 use crate::support::*;
5
6 mod support;
7
8 #[test]
test_args()9 fn test_args() {
10 let ctx = TestContext::new();
11 let connect = ctx.async_connection();
12
13 block_on_all(connect.and_then(|mut con| async move {
14 redis::cmd("SET")
15 .arg("key1")
16 .arg(b"foo")
17 .query_async(&mut con)
18 .await?;
19 redis::cmd("SET")
20 .arg(&["key2", "bar"])
21 .query_async(&mut con)
22 .await?;
23 let result = redis::cmd("MGET")
24 .arg(&["key1", "key2"])
25 .query_async(&mut con)
26 .await;
27 assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
28 result
29 }))
30 .unwrap();
31 }
32
33 #[test]
dont_panic_on_closed_multiplexed_connection()34 fn dont_panic_on_closed_multiplexed_connection() {
35 let ctx = TestContext::new();
36 let connect = ctx.multiplexed_async_connection();
37 drop(ctx);
38
39 block_on_all(async move {
40 connect
41 .and_then(|con| async move {
42 let cmd = move || {
43 let mut con = con.clone();
44 async move {
45 redis::cmd("SET")
46 .arg("key1")
47 .arg(b"foo")
48 .query_async(&mut con)
49 .await
50 }
51 };
52 let result: RedisResult<()> = cmd().await;
53 assert_eq!(
54 result.as_ref().unwrap_err().kind(),
55 redis::ErrorKind::IoError,
56 "{}",
57 result.as_ref().unwrap_err()
58 );
59 cmd().await
60 })
61 .map(|result| {
62 assert_eq!(
63 result.as_ref().unwrap_err().kind(),
64 redis::ErrorKind::IoError,
65 "{}",
66 result.as_ref().unwrap_err()
67 );
68 })
69 .await
70 });
71 }
72
73 #[test]
test_pipeline_transaction()74 fn test_pipeline_transaction() {
75 let ctx = TestContext::new();
76 block_on_all(async move {
77 let mut con = ctx.async_connection().await?;
78 let mut pipe = redis::pipe();
79 pipe.atomic()
80 .cmd("SET")
81 .arg("key_1")
82 .arg(42)
83 .ignore()
84 .cmd("SET")
85 .arg("key_2")
86 .arg(43)
87 .ignore()
88 .cmd("MGET")
89 .arg(&["key_1", "key_2"]);
90 pipe.query_async(&mut con)
91 .map_ok(|((k1, k2),): ((i32, i32),)| {
92 assert_eq!(k1, 42);
93 assert_eq!(k2, 43);
94 })
95 .await
96 })
97 .unwrap();
98 }
99
test_cmd(con: &MultiplexedConnection, i: i32) -> impl Future<Output = RedisResult<()>> + Send100 fn test_cmd(con: &MultiplexedConnection, i: i32) -> impl Future<Output = RedisResult<()>> + Send {
101 let mut con = con.clone();
102 async move {
103 let key = format!("key{}", i);
104 let key_2 = key.clone();
105 let key2 = format!("key{}_2", i);
106 let key2_2 = key2.clone();
107
108 let foo_val = format!("foo{}", i);
109
110 redis::cmd("SET")
111 .arg(&key[..])
112 .arg(foo_val.as_bytes())
113 .query_async(&mut con)
114 .await?;
115 redis::cmd("SET")
116 .arg(&[&key2, "bar"])
117 .query_async(&mut con)
118 .await?;
119 redis::cmd("MGET")
120 .arg(&[&key_2, &key2_2])
121 .query_async(&mut con)
122 .map(|result| {
123 assert_eq!(Ok((foo_val, b"bar".to_vec())), result);
124 Ok(())
125 })
126 .await
127 }
128 }
129
test_error(con: &MultiplexedConnection) -> impl Future<Output = RedisResult<()>>130 fn test_error(con: &MultiplexedConnection) -> impl Future<Output = RedisResult<()>> {
131 let mut con = con.clone();
132 async move {
133 redis::cmd("SET")
134 .query_async(&mut con)
135 .map(|result| match result {
136 Ok(()) => panic!("Expected redis to return an error"),
137 Err(_) => Ok(()),
138 })
139 .await
140 }
141 }
142
143 #[test]
test_args_multiplexed_connection()144 fn test_args_multiplexed_connection() {
145 let ctx = TestContext::new();
146 block_on_all(async move {
147 ctx.multiplexed_async_connection()
148 .and_then(|con| {
149 let cmds = (0..100).map(move |i| test_cmd(&con, i));
150 future::try_join_all(cmds).map_ok(|results| {
151 assert_eq!(results.len(), 100);
152 })
153 })
154 .map_err(|err| panic!("{}", err))
155 .await
156 })
157 .unwrap();
158 }
159
160 #[test]
test_args_with_errors_multiplexed_connection()161 fn test_args_with_errors_multiplexed_connection() {
162 let ctx = TestContext::new();
163 block_on_all(async move {
164 ctx.multiplexed_async_connection()
165 .and_then(|con| {
166 let cmds = (0..100).map(move |i| {
167 let con = con.clone();
168 async move {
169 if i % 2 == 0 {
170 test_cmd(&con, i).await
171 } else {
172 test_error(&con).await
173 }
174 }
175 });
176 future::try_join_all(cmds).map_ok(|results| {
177 assert_eq!(results.len(), 100);
178 })
179 })
180 .map_err(|err| panic!("{}", err))
181 .await
182 })
183 .unwrap();
184 }
185
186 #[test]
test_transaction_multiplexed_connection()187 fn test_transaction_multiplexed_connection() {
188 let ctx = TestContext::new();
189 block_on_all(async move {
190 ctx.multiplexed_async_connection()
191 .and_then(|con| {
192 let cmds = (0..100).map(move |i| {
193 let mut con = con.clone();
194 async move {
195 let foo_val = i;
196 let bar_val = format!("bar{}", i);
197
198 let mut pipe = redis::pipe();
199 pipe.atomic()
200 .cmd("SET")
201 .arg("key")
202 .arg(foo_val)
203 .ignore()
204 .cmd("SET")
205 .arg(&["key2", &bar_val[..]])
206 .ignore()
207 .cmd("MGET")
208 .arg(&["key", "key2"]);
209
210 pipe.query_async(&mut con)
211 .map(move |result| {
212 assert_eq!(Ok(((foo_val, bar_val.into_bytes()),)), result);
213 result
214 })
215 .await
216 }
217 });
218 future::try_join_all(cmds)
219 })
220 .map_ok(|results| {
221 assert_eq!(results.len(), 100);
222 })
223 .map_err(|err| panic!("{}", err))
224 .await
225 })
226 .unwrap();
227 }
228
229 #[test]
test_async_scanning()230 fn test_async_scanning() {
231 let ctx = TestContext::new();
232 block_on_all(async move {
233 ctx.multiplexed_async_connection()
234 .and_then(|mut con| {
235 async move {
236 let mut unseen = std::collections::HashSet::new();
237
238 for x in 0..1000 {
239 redis::cmd("SADD")
240 .arg("foo")
241 .arg(x)
242 .query_async(&mut con)
243 .await?;
244 unseen.insert(x);
245 }
246
247 let mut iter = redis::cmd("SSCAN")
248 .arg("foo")
249 .cursor_arg(0)
250 .clone()
251 .iter_async(&mut con)
252 .await
253 .unwrap();
254
255 while let Some(x) = iter.next_item().await {
256 // type inference limitations
257 let x: usize = x;
258 unseen.remove(&x);
259 }
260
261 assert_eq!(unseen.len(), 0);
262 Ok(())
263 }
264 })
265 .map_err(|err| panic!("{}", err))
266 .await
267 })
268 .unwrap();
269 }
270
271 #[test]
272 #[cfg(feature = "script")]
test_script()273 fn test_script() {
274 use redis::RedisError;
275
276 // Note this test runs both scripts twice to test when they have already been loaded
277 // into Redis and when they need to be loaded in
278 let script1 = redis::Script::new("return redis.call('SET', KEYS[1], ARGV[1])");
279 let script2 = redis::Script::new("return redis.call('GET', KEYS[1])");
280
281 let ctx = TestContext::new();
282
283 block_on_all(async move {
284 let mut con = ctx.multiplexed_async_connection().await?;
285 script1
286 .key("key1")
287 .arg("foo")
288 .invoke_async(&mut con)
289 .await?;
290 let val: String = script2.key("key1").invoke_async(&mut con).await?;
291 assert_eq!(val, "foo");
292 script1
293 .key("key1")
294 .arg("bar")
295 .invoke_async(&mut con)
296 .await?;
297 let val: String = script2.key("key1").invoke_async(&mut con).await?;
298 assert_eq!(val, "bar");
299 Ok(())
300 })
301 .map_err(|err: RedisError| err)
302 .unwrap();
303 }
304
305 #[test]
306 #[cfg(feature = "script")]
test_script_returning_complex_type()307 fn test_script_returning_complex_type() {
308 let ctx = TestContext::new();
309 block_on_all(async {
310 let mut con = ctx.multiplexed_async_connection().await?;
311 redis::Script::new("return {1, ARGV[1], true}")
312 .arg("hello")
313 .invoke_async(&mut con)
314 .map_ok(|(i, s, b): (i32, String, bool)| {
315 assert_eq!(i, 1);
316 assert_eq!(s, "hello");
317 assert_eq!(b, true);
318 })
319 .await
320 })
321 .unwrap();
322 }
323
324 // Allowing `nth(0)` for similarity with the following `nth(1)`.
325 // Allowing `let ()` as `query_async` requries the type it converts the result to.
326 #[allow(clippy::let_unit_value, clippy::iter_nth_zero)]
327 #[tokio::test]
io_error_on_kill_issue_320()328 async fn io_error_on_kill_issue_320() {
329 let ctx = TestContext::new();
330
331 let mut conn_to_kill = ctx.async_connection().await.unwrap();
332 cmd("CLIENT")
333 .arg("SETNAME")
334 .arg("to-kill")
335 .query_async::<_, ()>(&mut conn_to_kill)
336 .await
337 .unwrap();
338
339 let client_list: String = cmd("CLIENT")
340 .arg("LIST")
341 .query_async(&mut conn_to_kill)
342 .await
343 .unwrap();
344
345 eprintln!("{}", client_list);
346 let client_to_kill = client_list
347 .split('\n')
348 .find(|line| line.contains("to-kill"))
349 .expect("line")
350 .split(' ')
351 .nth(0)
352 .expect("id")
353 .split('=')
354 .nth(1)
355 .expect("id value");
356
357 let mut killer_conn = ctx.async_connection().await.unwrap();
358 let () = cmd("CLIENT")
359 .arg("KILL")
360 .arg("ID")
361 .arg(client_to_kill)
362 .query_async(&mut killer_conn)
363 .await
364 .unwrap();
365 let mut killed_client = conn_to_kill;
366
367 let err = loop {
368 match killed_client.get::<_, Option<String>>("a").await {
369 // We are racing against the server being shutdown so try until we a get an io error
370 Ok(_) => tokio::time::delay_for(std::time::Duration::from_millis(50)).await,
371 Err(err) => break err,
372 }
373 };
374 assert_eq!(err.kind(), ErrorKind::IoError); // Shouldn't this be IoError?
375 }
376
377 #[tokio::test]
invalid_password_issue_343()378 async fn invalid_password_issue_343() {
379 let ctx = TestContext::new();
380 let coninfo = redis::ConnectionInfo {
381 addr: Box::new(ctx.server.get_client_addr().clone()),
382 db: 0,
383 username: None,
384 passwd: Some("asdcasc".to_string()),
385 };
386 let client = redis::Client::open(coninfo).unwrap();
387 let err = client
388 .get_multiplexed_tokio_connection()
389 .await
390 .err()
391 .unwrap();
392 assert_eq!(
393 err.kind(),
394 ErrorKind::AuthenticationFailed,
395 "Unexpected error: {}",
396 err
397 );
398 }
399
400 mod pub_sub {
401 use std::collections::HashMap;
402 use std::time::Duration;
403
404 use super::*;
405
406 #[test]
pub_sub_subscription()407 fn pub_sub_subscription() {
408 use redis::RedisError;
409
410 let ctx = TestContext::new();
411 block_on_all(async move {
412 let mut pubsub_conn = ctx.async_connection().await?.into_pubsub();
413 pubsub_conn.subscribe("phonewave").await?;
414 let mut pubsub_stream = pubsub_conn.on_message();
415 let mut publish_conn = ctx.async_connection().await?;
416 publish_conn.publish("phonewave", "banana").await?;
417
418 let msg_payload: String = pubsub_stream.next().await.unwrap().get_payload()?;
419 assert_eq!("banana".to_string(), msg_payload);
420
421 Ok(())
422 })
423 .map_err(|err: RedisError| err)
424 .unwrap();
425 }
426
427 #[test]
pub_sub_unsubscription()428 fn pub_sub_unsubscription() {
429 use redis::RedisError;
430
431 const SUBSCRIPTION_KEY: &str = "phonewave-pub-sub-unsubscription";
432
433 let ctx = TestContext::new();
434 block_on_all(async move {
435 let mut pubsub_conn = ctx.async_connection().await?.into_pubsub();
436 pubsub_conn.subscribe(SUBSCRIPTION_KEY).await?;
437 pubsub_conn.unsubscribe(SUBSCRIPTION_KEY).await?;
438
439 let mut conn = ctx.async_connection().await?;
440 let subscriptions_counts: HashMap<String, u32> = redis::cmd("PUBSUB")
441 .arg("NUMSUB")
442 .arg(SUBSCRIPTION_KEY)
443 .query_async(&mut conn)
444 .await?;
445 let subscription_count = *subscriptions_counts.get(SUBSCRIPTION_KEY).unwrap();
446 assert_eq!(subscription_count, 0);
447
448 Ok(())
449 })
450 .map_err(|err: RedisError| err)
451 .unwrap();
452 }
453
454 #[test]
automatic_unsubscription()455 fn automatic_unsubscription() {
456 use redis::RedisError;
457
458 const SUBSCRIPTION_KEY: &str = "phonewave-automatic-unsubscription";
459
460 let ctx = TestContext::new();
461 block_on_all(async move {
462 let mut pubsub_conn = ctx.async_connection().await?.into_pubsub();
463 pubsub_conn.subscribe(SUBSCRIPTION_KEY).await?;
464 drop(pubsub_conn);
465
466 std::thread::sleep(Duration::from_millis(50));
467
468 let mut conn = ctx.async_connection().await?;
469 let subscriptions_counts: HashMap<String, u32> = redis::cmd("PUBSUB")
470 .arg("NUMSUB")
471 .arg(SUBSCRIPTION_KEY)
472 .query_async(&mut conn)
473 .await?;
474 let subscription_count = *subscriptions_counts.get(SUBSCRIPTION_KEY).unwrap();
475 assert_eq!(subscription_count, 0);
476
477 Ok(())
478 })
479 .map_err(|err: RedisError| err)
480 .unwrap();
481 }
482
483 #[test]
pub_sub_conn_reuse()484 fn pub_sub_conn_reuse() {
485 use redis::RedisError;
486
487 let ctx = TestContext::new();
488 block_on_all(async move {
489 let mut pubsub_conn = ctx.async_connection().await?.into_pubsub();
490 pubsub_conn.subscribe("phonewave").await?;
491 pubsub_conn.psubscribe("*").await?;
492
493 let mut conn = pubsub_conn.into_connection().await;
494 redis::cmd("SET")
495 .arg("foo")
496 .arg("bar")
497 .query_async(&mut conn)
498 .await?;
499
500 let res: String = redis::cmd("GET").arg("foo").query_async(&mut conn).await?;
501 assert_eq!(&res, "bar");
502
503 Ok(())
504 })
505 .map_err(|err: RedisError| err)
506 .unwrap();
507 }
508 }
509