1 use futures::{future, prelude::*};
2
3 use crate::support::*;
4
5 use redis::{aio::MultiplexedConnection, RedisResult};
6
7 mod support;
8
9 #[test]
test_args()10 fn test_args() {
11 let ctx = TestContext::new();
12 let connect = ctx.async_connection_async_std();
13
14 block_on_all_using_async_std(connect.and_then(|mut con| async move {
15 redis::cmd("SET")
16 .arg("key1")
17 .arg(b"foo")
18 .query_async(&mut con)
19 .await?;
20 redis::cmd("SET")
21 .arg(&["key2", "bar"])
22 .query_async(&mut con)
23 .await?;
24 let result = redis::cmd("MGET")
25 .arg(&["key1", "key2"])
26 .query_async(&mut con)
27 .await;
28 assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
29 result
30 }))
31 .unwrap();
32 }
33
34 #[test]
test_args_async_std()35 fn test_args_async_std() {
36 let ctx = TestContext::new();
37 let connect = ctx.async_connection_async_std();
38
39 block_on_all_using_async_std(connect.and_then(|mut con| async move {
40 redis::cmd("SET")
41 .arg("key1")
42 .arg(b"foo")
43 .query_async(&mut con)
44 .await?;
45 redis::cmd("SET")
46 .arg(&["key2", "bar"])
47 .query_async(&mut con)
48 .await?;
49 let result = redis::cmd("MGET")
50 .arg(&["key1", "key2"])
51 .query_async(&mut con)
52 .await;
53 assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
54 result
55 }))
56 .unwrap();
57 }
58
59 #[test]
dont_panic_on_closed_multiplexed_connection()60 fn dont_panic_on_closed_multiplexed_connection() {
61 let ctx = TestContext::new();
62 let connect = ctx.multiplexed_async_connection_async_std();
63 drop(ctx);
64
65 block_on_all_using_async_std(async move {
66 connect
67 .and_then(|con| async move {
68 let cmd = move || {
69 let mut con = con.clone();
70 async move {
71 redis::cmd("SET")
72 .arg("key1")
73 .arg(b"foo")
74 .query_async(&mut con)
75 .await
76 }
77 };
78 let result: RedisResult<()> = cmd().await;
79 assert_eq!(
80 result.as_ref().unwrap_err().kind(),
81 redis::ErrorKind::IoError,
82 "{}",
83 result.as_ref().unwrap_err()
84 );
85 cmd().await
86 })
87 .map(|result| {
88 assert_eq!(
89 result.as_ref().unwrap_err().kind(),
90 redis::ErrorKind::IoError,
91 "{}",
92 result.as_ref().unwrap_err()
93 );
94 })
95 .await
96 });
97 }
98
99 #[test]
test_pipeline_transaction()100 fn test_pipeline_transaction() {
101 let ctx = TestContext::new();
102 block_on_all_using_async_std(async move {
103 let mut con = ctx.async_connection_async_std().await?;
104 let mut pipe = redis::pipe();
105 pipe.atomic()
106 .cmd("SET")
107 .arg("key_1")
108 .arg(42)
109 .ignore()
110 .cmd("SET")
111 .arg("key_2")
112 .arg(43)
113 .ignore()
114 .cmd("MGET")
115 .arg(&["key_1", "key_2"]);
116 pipe.query_async(&mut con)
117 .map_ok(|((k1, k2),): ((i32, i32),)| {
118 assert_eq!(k1, 42);
119 assert_eq!(k2, 43);
120 })
121 .await
122 })
123 .unwrap();
124 }
125
test_cmd(con: &MultiplexedConnection, i: i32) -> impl Future<Output = RedisResult<()>> + Send126 fn test_cmd(con: &MultiplexedConnection, i: i32) -> impl Future<Output = RedisResult<()>> + Send {
127 let mut con = con.clone();
128 async move {
129 let key = format!("key{}", i);
130 let key_2 = key.clone();
131 let key2 = format!("key{}_2", i);
132 let key2_2 = key2.clone();
133
134 let foo_val = format!("foo{}", i);
135
136 redis::cmd("SET")
137 .arg(&key[..])
138 .arg(foo_val.as_bytes())
139 .query_async(&mut con)
140 .await?;
141 redis::cmd("SET")
142 .arg(&[&key2, "bar"])
143 .query_async(&mut con)
144 .await?;
145 redis::cmd("MGET")
146 .arg(&[&key_2, &key2_2])
147 .query_async(&mut con)
148 .map(|result| {
149 assert_eq!(Ok((foo_val, b"bar".to_vec())), result);
150 Ok(())
151 })
152 .await
153 }
154 }
155
test_error(con: &MultiplexedConnection) -> impl Future<Output = RedisResult<()>>156 fn test_error(con: &MultiplexedConnection) -> impl Future<Output = RedisResult<()>> {
157 let mut con = con.clone();
158 async move {
159 redis::cmd("SET")
160 .query_async(&mut con)
161 .map(|result| match result {
162 Ok(()) => panic!("Expected redis to return an error"),
163 Err(_) => Ok(()),
164 })
165 .await
166 }
167 }
168
169 #[test]
test_args_multiplexed_connection()170 fn test_args_multiplexed_connection() {
171 let ctx = TestContext::new();
172 block_on_all_using_async_std(async move {
173 ctx.multiplexed_async_connection_async_std()
174 .and_then(|con| {
175 let cmds = (0..100).map(move |i| test_cmd(&con, i));
176 future::try_join_all(cmds).map_ok(|results| {
177 assert_eq!(results.len(), 100);
178 })
179 })
180 .map_err(|err| panic!("{}", err))
181 .await
182 })
183 .unwrap();
184 }
185
186 #[test]
test_args_with_errors_multiplexed_connection()187 fn test_args_with_errors_multiplexed_connection() {
188 let ctx = TestContext::new();
189 block_on_all_using_async_std(async move {
190 ctx.multiplexed_async_connection_async_std()
191 .and_then(|con| {
192 let cmds = (0..100).map(move |i| {
193 let con = con.clone();
194 async move {
195 if i % 2 == 0 {
196 test_cmd(&con, i).await
197 } else {
198 test_error(&con).await
199 }
200 }
201 });
202 future::try_join_all(cmds).map_ok(|results| {
203 assert_eq!(results.len(), 100);
204 })
205 })
206 .map_err(|err| panic!("{}", err))
207 .await
208 })
209 .unwrap();
210 }
211
212 #[test]
test_transaction_multiplexed_connection()213 fn test_transaction_multiplexed_connection() {
214 let ctx = TestContext::new();
215 block_on_all_using_async_std(async move {
216 ctx.multiplexed_async_connection_async_std()
217 .and_then(|con| {
218 let cmds = (0..100).map(move |i| {
219 let mut con = con.clone();
220 async move {
221 let foo_val = i;
222 let bar_val = format!("bar{}", i);
223
224 let mut pipe = redis::pipe();
225 pipe.atomic()
226 .cmd("SET")
227 .arg("key")
228 .arg(foo_val)
229 .ignore()
230 .cmd("SET")
231 .arg(&["key2", &bar_val[..]])
232 .ignore()
233 .cmd("MGET")
234 .arg(&["key", "key2"]);
235
236 pipe.query_async(&mut con)
237 .map(move |result| {
238 assert_eq!(Ok(((foo_val, bar_val.into_bytes()),)), result);
239 result
240 })
241 .await
242 }
243 });
244 future::try_join_all(cmds)
245 })
246 .map_ok(|results| {
247 assert_eq!(results.len(), 100);
248 })
249 .map_err(|err| panic!("{}", err))
250 .await
251 })
252 .unwrap();
253 }
254
255 #[test]
256 #[cfg(feature = "script")]
test_script()257 fn test_script() {
258 use redis::RedisError;
259
260 // Note this test runs both scripts twice to test when they have already been loaded
261 // into Redis and when they need to be loaded in
262 let script1 = redis::Script::new("return redis.call('SET', KEYS[1], ARGV[1])");
263 let script2 = redis::Script::new("return redis.call('GET', KEYS[1])");
264
265 let ctx = TestContext::new();
266
267 block_on_all_using_async_std(async move {
268 let mut con = ctx.multiplexed_async_connection_async_std().await?;
269 script1
270 .key("key1")
271 .arg("foo")
272 .invoke_async(&mut con)
273 .await?;
274 let val: String = script2.key("key1").invoke_async(&mut con).await?;
275 assert_eq!(val, "foo");
276 script1
277 .key("key1")
278 .arg("bar")
279 .invoke_async(&mut con)
280 .await?;
281 let val: String = script2.key("key1").invoke_async(&mut con).await?;
282 assert_eq!(val, "bar");
283 Ok(())
284 })
285 .map_err(|err: RedisError| err)
286 .unwrap();
287 }
288
289 #[test]
290 #[cfg(feature = "script")]
test_script_returning_complex_type()291 fn test_script_returning_complex_type() {
292 let ctx = TestContext::new();
293 block_on_all_using_async_std(async {
294 let mut con = ctx.multiplexed_async_connection_async_std().await?;
295 redis::Script::new("return {1, ARGV[1], true}")
296 .arg("hello")
297 .invoke_async(&mut con)
298 .map_ok(|(i, s, b): (i32, String, bool)| {
299 assert_eq!(i, 1);
300 assert_eq!(s, "hello");
301 assert_eq!(b, true);
302 })
303 .await
304 })
305 .unwrap();
306 }
307