1 use futures::{future, prelude::*};
2 use redis::{aio::MultiplexedConnection, cmd, AsyncCommands, ErrorKind, RedisResult};
3 
4 use crate::support::*;
5 
6 mod support;
7 
8 #[test]
test_args()9 fn test_args() {
10     let ctx = TestContext::new();
11     let connect = ctx.async_connection();
12 
13     block_on_all(connect.and_then(|mut con| async move {
14         redis::cmd("SET")
15             .arg("key1")
16             .arg(b"foo")
17             .query_async(&mut con)
18             .await?;
19         redis::cmd("SET")
20             .arg(&["key2", "bar"])
21             .query_async(&mut con)
22             .await?;
23         let result = redis::cmd("MGET")
24             .arg(&["key1", "key2"])
25             .query_async(&mut con)
26             .await;
27         assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
28         result
29     }))
30     .unwrap();
31 }
32 
33 #[test]
dont_panic_on_closed_multiplexed_connection()34 fn dont_panic_on_closed_multiplexed_connection() {
35     let ctx = TestContext::new();
36     let connect = ctx.multiplexed_async_connection();
37     drop(ctx);
38 
39     block_on_all(async move {
40         connect
41             .and_then(|con| async move {
42                 let cmd = move || {
43                     let mut con = con.clone();
44                     async move {
45                         redis::cmd("SET")
46                             .arg("key1")
47                             .arg(b"foo")
48                             .query_async(&mut con)
49                             .await
50                     }
51                 };
52                 let result: RedisResult<()> = cmd().await;
53                 assert_eq!(
54                     result.as_ref().unwrap_err().kind(),
55                     redis::ErrorKind::IoError,
56                     "{}",
57                     result.as_ref().unwrap_err()
58                 );
59                 cmd().await
60             })
61             .map(|result| {
62                 assert_eq!(
63                     result.as_ref().unwrap_err().kind(),
64                     redis::ErrorKind::IoError,
65                     "{}",
66                     result.as_ref().unwrap_err()
67                 );
68             })
69             .await
70     });
71 }
72 
73 #[test]
test_pipeline_transaction()74 fn test_pipeline_transaction() {
75     let ctx = TestContext::new();
76     block_on_all(async move {
77         let mut con = ctx.async_connection().await?;
78         let mut pipe = redis::pipe();
79         pipe.atomic()
80             .cmd("SET")
81             .arg("key_1")
82             .arg(42)
83             .ignore()
84             .cmd("SET")
85             .arg("key_2")
86             .arg(43)
87             .ignore()
88             .cmd("MGET")
89             .arg(&["key_1", "key_2"]);
90         pipe.query_async(&mut con)
91             .map_ok(|((k1, k2),): ((i32, i32),)| {
92                 assert_eq!(k1, 42);
93                 assert_eq!(k2, 43);
94             })
95             .await
96     })
97     .unwrap();
98 }
99 
test_cmd(con: &MultiplexedConnection, i: i32) -> impl Future<Output = RedisResult<()>> + Send100 fn test_cmd(con: &MultiplexedConnection, i: i32) -> impl Future<Output = RedisResult<()>> + Send {
101     let mut con = con.clone();
102     async move {
103         let key = format!("key{}", i);
104         let key_2 = key.clone();
105         let key2 = format!("key{}_2", i);
106         let key2_2 = key2.clone();
107 
108         let foo_val = format!("foo{}", i);
109 
110         redis::cmd("SET")
111             .arg(&key[..])
112             .arg(foo_val.as_bytes())
113             .query_async(&mut con)
114             .await?;
115         redis::cmd("SET")
116             .arg(&[&key2, "bar"])
117             .query_async(&mut con)
118             .await?;
119         redis::cmd("MGET")
120             .arg(&[&key_2, &key2_2])
121             .query_async(&mut con)
122             .map(|result| {
123                 assert_eq!(Ok((foo_val, b"bar".to_vec())), result);
124                 Ok(())
125             })
126             .await
127     }
128 }
129 
test_error(con: &MultiplexedConnection) -> impl Future<Output = RedisResult<()>>130 fn test_error(con: &MultiplexedConnection) -> impl Future<Output = RedisResult<()>> {
131     let mut con = con.clone();
132     async move {
133         redis::cmd("SET")
134             .query_async(&mut con)
135             .map(|result| match result {
136                 Ok(()) => panic!("Expected redis to return an error"),
137                 Err(_) => Ok(()),
138             })
139             .await
140     }
141 }
142 
143 #[test]
test_args_multiplexed_connection()144 fn test_args_multiplexed_connection() {
145     let ctx = TestContext::new();
146     block_on_all(async move {
147         ctx.multiplexed_async_connection()
148             .and_then(|con| {
149                 let cmds = (0..100).map(move |i| test_cmd(&con, i));
150                 future::try_join_all(cmds).map_ok(|results| {
151                     assert_eq!(results.len(), 100);
152                 })
153             })
154             .map_err(|err| panic!("{}", err))
155             .await
156     })
157     .unwrap();
158 }
159 
160 #[test]
test_args_with_errors_multiplexed_connection()161 fn test_args_with_errors_multiplexed_connection() {
162     let ctx = TestContext::new();
163     block_on_all(async move {
164         ctx.multiplexed_async_connection()
165             .and_then(|con| {
166                 let cmds = (0..100).map(move |i| {
167                     let con = con.clone();
168                     async move {
169                         if i % 2 == 0 {
170                             test_cmd(&con, i).await
171                         } else {
172                             test_error(&con).await
173                         }
174                     }
175                 });
176                 future::try_join_all(cmds).map_ok(|results| {
177                     assert_eq!(results.len(), 100);
178                 })
179             })
180             .map_err(|err| panic!("{}", err))
181             .await
182     })
183     .unwrap();
184 }
185 
186 #[test]
test_transaction_multiplexed_connection()187 fn test_transaction_multiplexed_connection() {
188     let ctx = TestContext::new();
189     block_on_all(async move {
190         ctx.multiplexed_async_connection()
191             .and_then(|con| {
192                 let cmds = (0..100).map(move |i| {
193                     let mut con = con.clone();
194                     async move {
195                         let foo_val = i;
196                         let bar_val = format!("bar{}", i);
197 
198                         let mut pipe = redis::pipe();
199                         pipe.atomic()
200                             .cmd("SET")
201                             .arg("key")
202                             .arg(foo_val)
203                             .ignore()
204                             .cmd("SET")
205                             .arg(&["key2", &bar_val[..]])
206                             .ignore()
207                             .cmd("MGET")
208                             .arg(&["key", "key2"]);
209 
210                         pipe.query_async(&mut con)
211                             .map(move |result| {
212                                 assert_eq!(Ok(((foo_val, bar_val.into_bytes()),)), result);
213                                 result
214                             })
215                             .await
216                     }
217                 });
218                 future::try_join_all(cmds)
219             })
220             .map_ok(|results| {
221                 assert_eq!(results.len(), 100);
222             })
223             .map_err(|err| panic!("{}", err))
224             .await
225     })
226     .unwrap();
227 }
228 
229 #[test]
test_async_scanning()230 fn test_async_scanning() {
231     let ctx = TestContext::new();
232     block_on_all(async move {
233         ctx.multiplexed_async_connection()
234             .and_then(|mut con| {
235                 async move {
236                     let mut unseen = std::collections::HashSet::new();
237 
238                     for x in 0..1000 {
239                         redis::cmd("SADD")
240                             .arg("foo")
241                             .arg(x)
242                             .query_async(&mut con)
243                             .await?;
244                         unseen.insert(x);
245                     }
246 
247                     let mut iter = redis::cmd("SSCAN")
248                         .arg("foo")
249                         .cursor_arg(0)
250                         .clone()
251                         .iter_async(&mut con)
252                         .await
253                         .unwrap();
254 
255                     while let Some(x) = iter.next_item().await {
256                         // type inference limitations
257                         let x: usize = x;
258                         unseen.remove(&x);
259                     }
260 
261                     assert_eq!(unseen.len(), 0);
262                     Ok(())
263                 }
264             })
265             .map_err(|err| panic!("{}", err))
266             .await
267     })
268     .unwrap();
269 }
270 
271 #[test]
272 #[cfg(feature = "script")]
test_script()273 fn test_script() {
274     use redis::RedisError;
275 
276     // Note this test runs both scripts twice to test when they have already been loaded
277     // into Redis and when they need to be loaded in
278     let script1 = redis::Script::new("return redis.call('SET', KEYS[1], ARGV[1])");
279     let script2 = redis::Script::new("return redis.call('GET', KEYS[1])");
280 
281     let ctx = TestContext::new();
282 
283     block_on_all(async move {
284         let mut con = ctx.multiplexed_async_connection().await?;
285         script1
286             .key("key1")
287             .arg("foo")
288             .invoke_async(&mut con)
289             .await?;
290         let val: String = script2.key("key1").invoke_async(&mut con).await?;
291         assert_eq!(val, "foo");
292         script1
293             .key("key1")
294             .arg("bar")
295             .invoke_async(&mut con)
296             .await?;
297         let val: String = script2.key("key1").invoke_async(&mut con).await?;
298         assert_eq!(val, "bar");
299         Ok(())
300     })
301     .map_err(|err: RedisError| err)
302     .unwrap();
303 }
304 
305 #[test]
306 #[cfg(feature = "script")]
test_script_returning_complex_type()307 fn test_script_returning_complex_type() {
308     let ctx = TestContext::new();
309     block_on_all(async {
310         let mut con = ctx.multiplexed_async_connection().await?;
311         redis::Script::new("return {1, ARGV[1], true}")
312             .arg("hello")
313             .invoke_async(&mut con)
314             .map_ok(|(i, s, b): (i32, String, bool)| {
315                 assert_eq!(i, 1);
316                 assert_eq!(s, "hello");
317                 assert_eq!(b, true);
318             })
319             .await
320     })
321     .unwrap();
322 }
323 
324 // Allowing `nth(0)` for similarity with the following `nth(1)`.
325 // Allowing `let ()` as `query_async` requries the type it converts the result to.
326 #[allow(clippy::let_unit_value, clippy::iter_nth_zero)]
327 #[tokio::test]
io_error_on_kill_issue_320()328 async fn io_error_on_kill_issue_320() {
329     let ctx = TestContext::new();
330 
331     let mut conn_to_kill = ctx.async_connection().await.unwrap();
332     cmd("CLIENT")
333         .arg("SETNAME")
334         .arg("to-kill")
335         .query_async::<_, ()>(&mut conn_to_kill)
336         .await
337         .unwrap();
338 
339     let client_list: String = cmd("CLIENT")
340         .arg("LIST")
341         .query_async(&mut conn_to_kill)
342         .await
343         .unwrap();
344 
345     eprintln!("{}", client_list);
346     let client_to_kill = client_list
347         .split('\n')
348         .find(|line| line.contains("to-kill"))
349         .expect("line")
350         .split(' ')
351         .nth(0)
352         .expect("id")
353         .split('=')
354         .nth(1)
355         .expect("id value");
356 
357     let mut killer_conn = ctx.async_connection().await.unwrap();
358     let () = cmd("CLIENT")
359         .arg("KILL")
360         .arg("ID")
361         .arg(client_to_kill)
362         .query_async(&mut killer_conn)
363         .await
364         .unwrap();
365     let mut killed_client = conn_to_kill;
366 
367     let err = loop {
368         match killed_client.get::<_, Option<String>>("a").await {
369             // We are racing against the server being shutdown so try until we a get an io error
370             Ok(_) => tokio::time::delay_for(std::time::Duration::from_millis(50)).await,
371             Err(err) => break err,
372         }
373     };
374     assert_eq!(err.kind(), ErrorKind::IoError); // Shouldn't this be IoError?
375 }
376 
377 #[tokio::test]
invalid_password_issue_343()378 async fn invalid_password_issue_343() {
379     let ctx = TestContext::new();
380     let coninfo = redis::ConnectionInfo {
381         addr: Box::new(ctx.server.get_client_addr().clone()),
382         db: 0,
383         username: None,
384         passwd: Some("asdcasc".to_string()),
385     };
386     let client = redis::Client::open(coninfo).unwrap();
387     let err = client
388         .get_multiplexed_tokio_connection()
389         .await
390         .err()
391         .unwrap();
392     assert_eq!(
393         err.kind(),
394         ErrorKind::AuthenticationFailed,
395         "Unexpected error: {}",
396         err
397     );
398 }
399 
400 mod pub_sub {
401     use std::collections::HashMap;
402     use std::time::Duration;
403 
404     use super::*;
405 
406     #[test]
pub_sub_subscription()407     fn pub_sub_subscription() {
408         use redis::RedisError;
409 
410         let ctx = TestContext::new();
411         block_on_all(async move {
412             let mut pubsub_conn = ctx.async_connection().await?.into_pubsub();
413             pubsub_conn.subscribe("phonewave").await?;
414             let mut pubsub_stream = pubsub_conn.on_message();
415             let mut publish_conn = ctx.async_connection().await?;
416             publish_conn.publish("phonewave", "banana").await?;
417 
418             let msg_payload: String = pubsub_stream.next().await.unwrap().get_payload()?;
419             assert_eq!("banana".to_string(), msg_payload);
420 
421             Ok(())
422         })
423         .map_err(|err: RedisError| err)
424         .unwrap();
425     }
426 
427     #[test]
pub_sub_unsubscription()428     fn pub_sub_unsubscription() {
429         use redis::RedisError;
430 
431         const SUBSCRIPTION_KEY: &str = "phonewave-pub-sub-unsubscription";
432 
433         let ctx = TestContext::new();
434         block_on_all(async move {
435             let mut pubsub_conn = ctx.async_connection().await?.into_pubsub();
436             pubsub_conn.subscribe(SUBSCRIPTION_KEY).await?;
437             pubsub_conn.unsubscribe(SUBSCRIPTION_KEY).await?;
438 
439             let mut conn = ctx.async_connection().await?;
440             let subscriptions_counts: HashMap<String, u32> = redis::cmd("PUBSUB")
441                 .arg("NUMSUB")
442                 .arg(SUBSCRIPTION_KEY)
443                 .query_async(&mut conn)
444                 .await?;
445             let subscription_count = *subscriptions_counts.get(SUBSCRIPTION_KEY).unwrap();
446             assert_eq!(subscription_count, 0);
447 
448             Ok(())
449         })
450         .map_err(|err: RedisError| err)
451         .unwrap();
452     }
453 
454     #[test]
automatic_unsubscription()455     fn automatic_unsubscription() {
456         use redis::RedisError;
457 
458         const SUBSCRIPTION_KEY: &str = "phonewave-automatic-unsubscription";
459 
460         let ctx = TestContext::new();
461         block_on_all(async move {
462             let mut pubsub_conn = ctx.async_connection().await?.into_pubsub();
463             pubsub_conn.subscribe(SUBSCRIPTION_KEY).await?;
464             drop(pubsub_conn);
465 
466             std::thread::sleep(Duration::from_millis(50));
467 
468             let mut conn = ctx.async_connection().await?;
469             let subscriptions_counts: HashMap<String, u32> = redis::cmd("PUBSUB")
470                 .arg("NUMSUB")
471                 .arg(SUBSCRIPTION_KEY)
472                 .query_async(&mut conn)
473                 .await?;
474             let subscription_count = *subscriptions_counts.get(SUBSCRIPTION_KEY).unwrap();
475             assert_eq!(subscription_count, 0);
476 
477             Ok(())
478         })
479         .map_err(|err: RedisError| err)
480         .unwrap();
481     }
482 
483     #[test]
pub_sub_conn_reuse()484     fn pub_sub_conn_reuse() {
485         use redis::RedisError;
486 
487         let ctx = TestContext::new();
488         block_on_all(async move {
489             let mut pubsub_conn = ctx.async_connection().await?.into_pubsub();
490             pubsub_conn.subscribe("phonewave").await?;
491             pubsub_conn.psubscribe("*").await?;
492 
493             let mut conn = pubsub_conn.into_connection().await;
494             redis::cmd("SET")
495                 .arg("foo")
496                 .arg("bar")
497                 .query_async(&mut conn)
498                 .await?;
499 
500             let res: String = redis::cmd("GET").arg("foo").query_async(&mut conn).await?;
501             assert_eq!(&res, "bar");
502 
503             Ok(())
504         })
505         .map_err(|err: RedisError| err)
506         .unwrap();
507     }
508 }
509