1 use futures::{future, prelude::*};
2 
3 use crate::support::*;
4 
5 use redis::{aio::MultiplexedConnection, RedisResult};
6 
7 mod support;
8 
9 #[test]
test_args()10 fn test_args() {
11     let ctx = TestContext::new();
12     let connect = ctx.async_connection_async_std();
13 
14     block_on_all_using_async_std(connect.and_then(|mut con| async move {
15         redis::cmd("SET")
16             .arg("key1")
17             .arg(b"foo")
18             .query_async(&mut con)
19             .await?;
20         redis::cmd("SET")
21             .arg(&["key2", "bar"])
22             .query_async(&mut con)
23             .await?;
24         let result = redis::cmd("MGET")
25             .arg(&["key1", "key2"])
26             .query_async(&mut con)
27             .await;
28         assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
29         result
30     }))
31     .unwrap();
32 }
33 
34 #[test]
test_args_async_std()35 fn test_args_async_std() {
36     let ctx = TestContext::new();
37     let connect = ctx.async_connection_async_std();
38 
39     block_on_all_using_async_std(connect.and_then(|mut con| async move {
40         redis::cmd("SET")
41             .arg("key1")
42             .arg(b"foo")
43             .query_async(&mut con)
44             .await?;
45         redis::cmd("SET")
46             .arg(&["key2", "bar"])
47             .query_async(&mut con)
48             .await?;
49         let result = redis::cmd("MGET")
50             .arg(&["key1", "key2"])
51             .query_async(&mut con)
52             .await;
53         assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
54         result
55     }))
56     .unwrap();
57 }
58 
59 #[test]
dont_panic_on_closed_multiplexed_connection()60 fn dont_panic_on_closed_multiplexed_connection() {
61     let ctx = TestContext::new();
62     let connect = ctx.multiplexed_async_connection_async_std();
63     drop(ctx);
64 
65     block_on_all_using_async_std(async move {
66         connect
67             .and_then(|con| async move {
68                 let cmd = move || {
69                     let mut con = con.clone();
70                     async move {
71                         redis::cmd("SET")
72                             .arg("key1")
73                             .arg(b"foo")
74                             .query_async(&mut con)
75                             .await
76                     }
77                 };
78                 let result: RedisResult<()> = cmd().await;
79                 assert_eq!(
80                     result.as_ref().unwrap_err().kind(),
81                     redis::ErrorKind::IoError,
82                     "{}",
83                     result.as_ref().unwrap_err()
84                 );
85                 cmd().await
86             })
87             .map(|result| {
88                 assert_eq!(
89                     result.as_ref().unwrap_err().kind(),
90                     redis::ErrorKind::IoError,
91                     "{}",
92                     result.as_ref().unwrap_err()
93                 );
94             })
95             .await
96     });
97 }
98 
99 #[test]
test_pipeline_transaction()100 fn test_pipeline_transaction() {
101     let ctx = TestContext::new();
102     block_on_all_using_async_std(async move {
103         let mut con = ctx.async_connection_async_std().await?;
104         let mut pipe = redis::pipe();
105         pipe.atomic()
106             .cmd("SET")
107             .arg("key_1")
108             .arg(42)
109             .ignore()
110             .cmd("SET")
111             .arg("key_2")
112             .arg(43)
113             .ignore()
114             .cmd("MGET")
115             .arg(&["key_1", "key_2"]);
116         pipe.query_async(&mut con)
117             .map_ok(|((k1, k2),): ((i32, i32),)| {
118                 assert_eq!(k1, 42);
119                 assert_eq!(k2, 43);
120             })
121             .await
122     })
123     .unwrap();
124 }
125 
test_cmd(con: &MultiplexedConnection, i: i32) -> impl Future<Output = RedisResult<()>> + Send126 fn test_cmd(con: &MultiplexedConnection, i: i32) -> impl Future<Output = RedisResult<()>> + Send {
127     let mut con = con.clone();
128     async move {
129         let key = format!("key{}", i);
130         let key_2 = key.clone();
131         let key2 = format!("key{}_2", i);
132         let key2_2 = key2.clone();
133 
134         let foo_val = format!("foo{}", i);
135 
136         redis::cmd("SET")
137             .arg(&key[..])
138             .arg(foo_val.as_bytes())
139             .query_async(&mut con)
140             .await?;
141         redis::cmd("SET")
142             .arg(&[&key2, "bar"])
143             .query_async(&mut con)
144             .await?;
145         redis::cmd("MGET")
146             .arg(&[&key_2, &key2_2])
147             .query_async(&mut con)
148             .map(|result| {
149                 assert_eq!(Ok((foo_val, b"bar".to_vec())), result);
150                 Ok(())
151             })
152             .await
153     }
154 }
155 
test_error(con: &MultiplexedConnection) -> impl Future<Output = RedisResult<()>>156 fn test_error(con: &MultiplexedConnection) -> impl Future<Output = RedisResult<()>> {
157     let mut con = con.clone();
158     async move {
159         redis::cmd("SET")
160             .query_async(&mut con)
161             .map(|result| match result {
162                 Ok(()) => panic!("Expected redis to return an error"),
163                 Err(_) => Ok(()),
164             })
165             .await
166     }
167 }
168 
169 #[test]
test_args_multiplexed_connection()170 fn test_args_multiplexed_connection() {
171     let ctx = TestContext::new();
172     block_on_all_using_async_std(async move {
173         ctx.multiplexed_async_connection_async_std()
174             .and_then(|con| {
175                 let cmds = (0..100).map(move |i| test_cmd(&con, i));
176                 future::try_join_all(cmds).map_ok(|results| {
177                     assert_eq!(results.len(), 100);
178                 })
179             })
180             .map_err(|err| panic!("{}", err))
181             .await
182     })
183     .unwrap();
184 }
185 
186 #[test]
test_args_with_errors_multiplexed_connection()187 fn test_args_with_errors_multiplexed_connection() {
188     let ctx = TestContext::new();
189     block_on_all_using_async_std(async move {
190         ctx.multiplexed_async_connection_async_std()
191             .and_then(|con| {
192                 let cmds = (0..100).map(move |i| {
193                     let con = con.clone();
194                     async move {
195                         if i % 2 == 0 {
196                             test_cmd(&con, i).await
197                         } else {
198                             test_error(&con).await
199                         }
200                     }
201                 });
202                 future::try_join_all(cmds).map_ok(|results| {
203                     assert_eq!(results.len(), 100);
204                 })
205             })
206             .map_err(|err| panic!("{}", err))
207             .await
208     })
209     .unwrap();
210 }
211 
212 #[test]
test_transaction_multiplexed_connection()213 fn test_transaction_multiplexed_connection() {
214     let ctx = TestContext::new();
215     block_on_all_using_async_std(async move {
216         ctx.multiplexed_async_connection_async_std()
217             .and_then(|con| {
218                 let cmds = (0..100).map(move |i| {
219                     let mut con = con.clone();
220                     async move {
221                         let foo_val = i;
222                         let bar_val = format!("bar{}", i);
223 
224                         let mut pipe = redis::pipe();
225                         pipe.atomic()
226                             .cmd("SET")
227                             .arg("key")
228                             .arg(foo_val)
229                             .ignore()
230                             .cmd("SET")
231                             .arg(&["key2", &bar_val[..]])
232                             .ignore()
233                             .cmd("MGET")
234                             .arg(&["key", "key2"]);
235 
236                         pipe.query_async(&mut con)
237                             .map(move |result| {
238                                 assert_eq!(Ok(((foo_val, bar_val.into_bytes()),)), result);
239                                 result
240                             })
241                             .await
242                     }
243                 });
244                 future::try_join_all(cmds)
245             })
246             .map_ok(|results| {
247                 assert_eq!(results.len(), 100);
248             })
249             .map_err(|err| panic!("{}", err))
250             .await
251     })
252     .unwrap();
253 }
254 
255 #[test]
256 #[cfg(feature = "script")]
test_script()257 fn test_script() {
258     use redis::RedisError;
259 
260     // Note this test runs both scripts twice to test when they have already been loaded
261     // into Redis and when they need to be loaded in
262     let script1 = redis::Script::new("return redis.call('SET', KEYS[1], ARGV[1])");
263     let script2 = redis::Script::new("return redis.call('GET', KEYS[1])");
264 
265     let ctx = TestContext::new();
266 
267     block_on_all_using_async_std(async move {
268         let mut con = ctx.multiplexed_async_connection_async_std().await?;
269         script1
270             .key("key1")
271             .arg("foo")
272             .invoke_async(&mut con)
273             .await?;
274         let val: String = script2.key("key1").invoke_async(&mut con).await?;
275         assert_eq!(val, "foo");
276         script1
277             .key("key1")
278             .arg("bar")
279             .invoke_async(&mut con)
280             .await?;
281         let val: String = script2.key("key1").invoke_async(&mut con).await?;
282         assert_eq!(val, "bar");
283         Ok(())
284     })
285     .map_err(|err: RedisError| err)
286     .unwrap();
287 }
288 
289 #[test]
290 #[cfg(feature = "script")]
test_script_returning_complex_type()291 fn test_script_returning_complex_type() {
292     let ctx = TestContext::new();
293     block_on_all_using_async_std(async {
294         let mut con = ctx.multiplexed_async_connection_async_std().await?;
295         redis::Script::new("return {1, ARGV[1], true}")
296             .arg("hello")
297             .invoke_async(&mut con)
298             .map_ok(|(i, s, b): (i32, String, bool)| {
299                 assert_eq!(i, 1);
300                 assert_eq!(s, "hello");
301                 assert_eq!(b, true);
302             })
303             .await
304     })
305     .unwrap();
306 }
307