1 #![cfg(feature = "streams")]
2
3 use redis::streams::*;
4 use redis::{Commands, Connection, RedisResult, ToRedisArgs};
5
6 mod support;
7 use crate::support::*;
8
9 use std::collections::BTreeMap;
10 use std::str;
11 use std::thread::sleep;
12 use std::time::Duration;
13
14 macro_rules! assert_args {
15 ($value:expr, $($args:expr),+) => {
16 let args = $value.to_redis_args();
17 let strings: Vec<_> = args.iter()
18 .map(|a| str::from_utf8(a.as_ref()).unwrap())
19 .collect();
20 assert_eq!(strings, vec![$($args),+]);
21 }
22 }
23
xadd(con: &mut Connection)24 fn xadd(con: &mut Connection) {
25 let _: RedisResult<String> =
26 con.xadd("k1", "1000-0", &[("hello", "world"), ("redis", "streams")]);
27 let _: RedisResult<String> = con.xadd("k1", "1000-1", &[("hello", "world2")]);
28 let _: RedisResult<String> = con.xadd("k2", "2000-0", &[("hello", "world")]);
29 let _: RedisResult<String> = con.xadd("k2", "2000-1", &[("hello", "world2")]);
30 }
31
xadd_keyrange(con: &mut Connection, key: &str, start: i32, end: i32)32 fn xadd_keyrange(con: &mut Connection, key: &str, start: i32, end: i32) {
33 for _i in start..end {
34 let _: RedisResult<String> = con.xadd(key, "*", &[("h", "w")]);
35 }
36 }
37
38 #[test]
test_cmd_options()39 fn test_cmd_options() {
40 // Tests the following command option builders....
41 // xclaim_options
42 // xread_options
43 // maxlen enum
44
45 // test read options
46
47 let empty = StreamClaimOptions::default();
48 assert_eq!(ToRedisArgs::to_redis_args(&empty).len(), 0);
49
50 let empty = StreamReadOptions::default();
51 assert_eq!(ToRedisArgs::to_redis_args(&empty).len(), 0);
52
53 let opts = StreamClaimOptions::default()
54 .idle(50)
55 .time(500)
56 .retry(3)
57 .with_force()
58 .with_justid();
59
60 assert_args!(
61 &opts,
62 "IDLE",
63 "50",
64 "TIME",
65 "500",
66 "RETRYCOUNT",
67 "3",
68 "FORCE",
69 "JUSTID"
70 );
71
72 // test maxlen options
73
74 assert_args!(StreamMaxlen::Approx(10), "MAXLEN", "~", "10");
75 assert_args!(StreamMaxlen::Equals(10), "MAXLEN", "=", "10");
76
77 // test read options
78
79 let opts = StreamReadOptions::default()
80 .noack()
81 .block(100)
82 .count(200)
83 .group("group-name", "consumer-name");
84
85 assert_args!(
86 &opts,
87 "BLOCK",
88 "100",
89 "COUNT",
90 "200",
91 "NOACK",
92 "GROUP",
93 "group-name",
94 "consumer-name"
95 );
96
97 // should skip noack because of missing group(,)
98 let opts = StreamReadOptions::default().noack().block(100).count(200);
99
100 assert_args!(&opts, "BLOCK", "100", "COUNT", "200");
101 }
102
103 #[test]
test_assorted_1()104 fn test_assorted_1() {
105 // Tests the following commands....
106 // xadd
107 // xadd_map (skip this for now)
108 // xadd_maxlen
109 // xread
110 // xlen
111
112 let ctx = TestContext::new();
113 let mut con = ctx.connection();
114
115 xadd(&mut con);
116
117 // smoke test that we get the same id back
118 let result: RedisResult<String> = con.xadd("k0", "1000-0", &[("x", "y")]);
119 assert_eq!(result.unwrap(), "1000-0");
120
121 // xread reply
122 let reply: StreamReadReply = con.xread(&["k1", "k2", "k3"], &["0", "0", "0"]).unwrap();
123
124 // verify reply contains 2 keys even though we asked for 3
125 assert_eq!(&reply.keys.len(), &2usize);
126
127 // verify first key & first id exist
128 assert_eq!(&reply.keys[0].key, "k1");
129 assert_eq!(&reply.keys[0].ids.len(), &2usize);
130 assert_eq!(&reply.keys[0].ids[0].id, "1000-0");
131
132 // lookup the key in StreamId map
133 let hello: Option<String> = reply.keys[0].ids[0].get("hello");
134 assert_eq!(hello, Some("world".to_string()));
135
136 // verify the second key was written
137 assert_eq!(&reply.keys[1].key, "k2");
138 assert_eq!(&reply.keys[1].ids.len(), &2usize);
139 assert_eq!(&reply.keys[1].ids[0].id, "2000-0");
140
141 // test xadd_map
142 let mut map: BTreeMap<&str, &str> = BTreeMap::new();
143 map.insert("ab", "cd");
144 map.insert("ef", "gh");
145 map.insert("ij", "kl");
146 let _: RedisResult<String> = con.xadd_map("k3", "3000-0", map);
147
148 let reply: StreamRangeReply = con.xrange_all("k3").unwrap();
149 assert_eq!(reply.ids[0].contains_key(&"ab"), true);
150 assert_eq!(reply.ids[0].contains_key(&"ef"), true);
151 assert_eq!(reply.ids[0].contains_key(&"ij"), true);
152
153 // test xadd w/ maxlength below...
154
155 // add 100 things to k4
156 xadd_keyrange(&mut con, "k4", 0, 100);
157
158 // test xlen.. should have 100 items
159 let result: RedisResult<usize> = con.xlen("k4");
160 assert_eq!(result, Ok(100));
161
162 // test xadd_maxlen
163 let _: RedisResult<String> =
164 con.xadd_maxlen("k4", StreamMaxlen::Equals(10), "*", &[("h", "w")]);
165 let result: RedisResult<usize> = con.xlen("k4");
166 assert_eq!(result, Ok(10));
167 }
168
169 #[test]
test_xgroup_create()170 fn test_xgroup_create() {
171 // Tests the following commands....
172 // xadd
173 // xinfo_stream
174 // xgroup_create
175 // xinfo_groups
176
177 let ctx = TestContext::new();
178 let mut con = ctx.connection();
179
180 xadd(&mut con);
181
182 // no key exists... this call breaks the connection pipe for some reason
183 let reply: RedisResult<StreamInfoStreamReply> = con.xinfo_stream("k10");
184 assert_eq!(reply.is_err(), true);
185
186 // redo the connection because the above error
187 con = ctx.connection();
188
189 // key should exist
190 let reply: StreamInfoStreamReply = con.xinfo_stream("k1").unwrap();
191 assert_eq!(&reply.first_entry.id, "1000-0");
192 assert_eq!(&reply.last_entry.id, "1000-1");
193 assert_eq!(&reply.last_generated_id, "1000-1");
194
195 // xgroup create (existing stream)
196 let result: RedisResult<String> = con.xgroup_create("k1", "g1", "$");
197 assert_eq!(result.is_ok(), true);
198
199 // xinfo groups (existing stream)
200 let result: RedisResult<StreamInfoGroupsReply> = con.xinfo_groups("k1");
201 assert_eq!(result.is_ok(), true);
202 let reply = result.unwrap();
203 assert_eq!(&reply.groups.len(), &1);
204 assert_eq!(&reply.groups[0].name, &"g1");
205 }
206
207 #[test]
test_assorted_2()208 fn test_assorted_2() {
209 // Tests the following commands....
210 // xadd
211 // xinfo_stream
212 // xinfo_groups
213 // xinfo_consumer
214 // xgroup_create_mkstream
215 // xread_options
216 // xack
217 // xpending
218 // xpending_count
219 // xpending_consumer_count
220
221 let ctx = TestContext::new();
222 let mut con = ctx.connection();
223
224 xadd(&mut con);
225
226 // test xgroup create w/ mkstream @ 0
227 let result: RedisResult<String> = con.xgroup_create_mkstream("k99", "g99", "0");
228 assert_eq!(result.is_ok(), true);
229
230 // Since nothing exists on this stream yet,
231 // it should have the defaults returned by the client
232 let result: RedisResult<StreamInfoGroupsReply> = con.xinfo_groups("k99");
233 assert_eq!(result.is_ok(), true);
234 let reply = result.unwrap();
235 assert_eq!(&reply.groups.len(), &1);
236 assert_eq!(&reply.groups[0].name, &"g99");
237 assert_eq!(&reply.groups[0].last_delivered_id, &"0-0");
238
239 // call xadd on k99 just so we can read from it
240 // using consumer g99 and test xinfo_consumers
241 let _: RedisResult<String> = con.xadd("k99", "1000-0", &[("a", "b"), ("c", "d")]);
242 let _: RedisResult<String> = con.xadd("k99", "1000-1", &[("e", "f"), ("g", "h")]);
243
244 // test empty PEL
245 let empty_reply: StreamPendingReply = con.xpending("k99", "g99").unwrap();
246
247 assert_eq!(empty_reply.count(), 0);
248 if let StreamPendingReply::Empty = empty_reply {
249 // looks good
250 } else {
251 panic!("Expected StreamPendingReply::Empty but got Data");
252 }
253
254 // passing options w/ group triggers XREADGROUP
255 // using ID=">" means all undelivered ids
256 // otherwise, ID="0 | ms-num" means all pending already
257 // sent to this client
258 let reply: StreamReadReply = con
259 .xread_options(
260 &["k99"],
261 &[">"],
262 StreamReadOptions::default().group("g99", "c99"),
263 )
264 .unwrap();
265 assert_eq!(reply.keys[0].ids.len(), 2);
266
267 // read xinfo consumers again, should have 2 messages for the c99 consumer
268 let reply: StreamInfoConsumersReply = con.xinfo_consumers("k99", "g99").unwrap();
269 assert_eq!(reply.consumers[0].pending, 2);
270
271 // ack one of these messages
272 let result: RedisResult<i32> = con.xack("k99", "g99", &["1000-0"]);
273 assert_eq!(result, Ok(1));
274
275 // get pending messages already seen by this client
276 // we should only have one now..
277 let reply: StreamReadReply = con
278 .xread_options(
279 &["k99"],
280 &["0"],
281 StreamReadOptions::default().group("g99", "c99"),
282 )
283 .unwrap();
284 assert_eq!(reply.keys.len(), 1);
285
286 // we should also have one pending here...
287 let reply: StreamInfoConsumersReply = con.xinfo_consumers("k99", "g99").unwrap();
288 assert_eq!(reply.consumers[0].pending, 1);
289
290 // add more and read so we can test xpending
291 let _: RedisResult<String> = con.xadd("k99", "1001-0", &[("i", "j"), ("k", "l")]);
292 let _: RedisResult<String> = con.xadd("k99", "1001-1", &[("m", "n"), ("o", "p")]);
293 let _: StreamReadReply = con
294 .xread_options(
295 &["k99"],
296 &[">"],
297 StreamReadOptions::default().group("g99", "c99"),
298 )
299 .unwrap();
300
301 // call xpending here...
302 // this has a different reply from what the count variations return
303 let data_reply: StreamPendingReply = con.xpending("k99", "g99").unwrap();
304
305 assert_eq!(data_reply.count(), 3);
306
307 if let StreamPendingReply::Data(data) = data_reply {
308 assert_stream_pending_data(data)
309 } else {
310 panic!("Expected StreamPendingReply::Data but got Empty");
311 }
312
313 // both count variations have the same reply types
314 let reply: StreamPendingCountReply = con.xpending_count("k99", "g99", "-", "+", 10).unwrap();
315 assert_eq!(reply.ids.len(), 3);
316
317 let reply: StreamPendingCountReply = con
318 .xpending_consumer_count("k99", "g99", "-", "+", 10, "c99")
319 .unwrap();
320 assert_eq!(reply.ids.len(), 3);
321
322 for StreamPendingId {
323 id,
324 consumer,
325 times_delivered,
326 last_delivered_ms: _,
327 } in reply.ids
328 {
329 assert!(!id.is_empty());
330 assert!(!consumer.is_empty());
331 assert!(times_delivered > 0);
332 }
333 }
334
assert_stream_pending_data(data: StreamPendingData)335 fn assert_stream_pending_data(data: StreamPendingData) {
336 assert_eq!(data.start_id, "1000-1");
337 assert_eq!(data.end_id, "1001-1");
338 assert_eq!(data.consumers.len(), 1);
339 assert_eq!(data.consumers[0].name, "c99");
340 }
341
342 #[test]
test_xadd_maxlen_map()343 fn test_xadd_maxlen_map() {
344 let ctx = TestContext::new();
345 let mut con = ctx.connection();
346
347 for i in 0..10 {
348 let mut map: BTreeMap<&str, &str> = BTreeMap::new();
349 let idx = i.to_string();
350 map.insert("idx", &idx);
351 let _: RedisResult<String> =
352 con.xadd_maxlen_map("maxlen_map", StreamMaxlen::Equals(3), "*", map);
353 }
354
355 let result: RedisResult<usize> = con.xlen("maxlen_map");
356 assert_eq!(result, Ok(3));
357 let reply: StreamRangeReply = con.xrange_all("maxlen_map").unwrap();
358
359 assert_eq!(reply.ids[0].get("idx"), Some("7".to_string()));
360 assert_eq!(reply.ids[1].get("idx"), Some("8".to_string()));
361 assert_eq!(reply.ids[2].get("idx"), Some("9".to_string()));
362 }
363
364 #[test]
test_xclaim()365 fn test_xclaim() {
366 // Tests the following commands....
367 // xclaim
368 // xclaim_options
369 let ctx = TestContext::new();
370 let mut con = ctx.connection();
371
372 // xclaim test basic idea:
373 // 1. we need to test adding messages to a group
374 // 2. then xreadgroup needs to define a consumer and read pending
375 // messages without acking them
376 // 3. then we need to sleep 5ms and call xpending
377 // 4. from here we should be able to claim message
378 // past the idle time and read them from a different consumer
379
380 // create the group
381 let result: RedisResult<String> = con.xgroup_create_mkstream("k1", "g1", "$");
382 assert_eq!(result.is_ok(), true);
383
384 // add some keys
385 xadd_keyrange(&mut con, "k1", 0, 10);
386
387 // read the pending items for this key & group
388 let reply: StreamReadReply = con
389 .xread_options(
390 &["k1"],
391 &[">"],
392 StreamReadOptions::default().group("g1", "c1"),
393 )
394 .unwrap();
395 // verify we have 10 ids
396 assert_eq!(reply.keys[0].ids.len(), 10);
397
398 // save this StreamId for later
399 let claim = &reply.keys[0].ids[0];
400 let _claim_1 = &reply.keys[0].ids[1];
401 let claim_justids = &reply.keys[0]
402 .ids
403 .iter()
404 .map(|msg| &msg.id)
405 .collect::<Vec<&String>>();
406
407 // sleep for 5ms
408 sleep(Duration::from_millis(5));
409
410 // grab this id if > 4ms
411 let reply: StreamClaimReply = con
412 .xclaim("k1", "g1", "c2", 4, &[claim.id.clone()])
413 .unwrap();
414 assert_eq!(reply.ids.len(), 1);
415 assert_eq!(reply.ids[0].id, claim.id);
416
417 // grab all pending ids for this key...
418 // we should 9 in c1 and 1 in c2
419 let reply: StreamPendingReply = con.xpending("k1", "g1").unwrap();
420 if let StreamPendingReply::Data(data) = reply {
421 assert_eq!(data.consumers[0].name, "c1");
422 assert_eq!(data.consumers[0].pending, 9);
423 assert_eq!(data.consumers[1].name, "c2");
424 assert_eq!(data.consumers[1].pending, 1);
425 }
426
427 // sleep for 5ms
428 sleep(Duration::from_millis(5));
429
430 // lets test some of the xclaim_options
431 // call force on the same claim.id
432 let _: StreamClaimReply = con
433 .xclaim_options(
434 "k1",
435 "g1",
436 "c3",
437 4,
438 &[claim.id.clone()],
439 StreamClaimOptions::default().with_force(),
440 )
441 .unwrap();
442
443 let reply: StreamPendingReply = con.xpending("k1", "g1").unwrap();
444 // we should have 9 w/ c1 and 1 w/ c3 now
445 if let StreamPendingReply::Data(data) = reply {
446 assert_eq!(data.consumers[1].name, "c3");
447 assert_eq!(data.consumers[1].pending, 1);
448 }
449
450 // sleep for 5ms
451 sleep(Duration::from_millis(5));
452
453 // claim and only return JUSTID
454 let claimed: Vec<String> = con
455 .xclaim_options(
456 "k1",
457 "g1",
458 "c5",
459 4,
460 &claim_justids,
461 StreamClaimOptions::default().with_force().with_justid(),
462 )
463 .unwrap();
464 // we just claimed the original 10 ids
465 // and only returned the ids
466 assert_eq!(claimed.len(), 10);
467 }
468
469 #[test]
test_xdel()470 fn test_xdel() {
471 // Tests the following commands....
472 // xdel
473 let ctx = TestContext::new();
474 let mut con = ctx.connection();
475
476 // add some keys
477 xadd(&mut con);
478
479 // delete the first stream item for this key
480 let result: RedisResult<i32> = con.xdel("k1", &["1000-0"]);
481 // returns the number of items deleted
482 assert_eq!(result, Ok(1));
483
484 let result: RedisResult<i32> = con.xdel("k2", &["2000-0", "2000-1", "2000-2"]);
485 // should equal 2 since the last id doesn't exist
486 assert_eq!(result, Ok(2));
487 }
488
489 #[test]
test_xtrim()490 fn test_xtrim() {
491 // Tests the following commands....
492 // xtrim
493 let ctx = TestContext::new();
494 let mut con = ctx.connection();
495
496 // add some keys
497 xadd_keyrange(&mut con, "k1", 0, 100);
498
499 // trim key to 50
500 // returns the number of items remaining in the stream
501 let result: RedisResult<i32> = con.xtrim("k1", StreamMaxlen::Equals(50));
502 assert_eq!(result, Ok(50));
503 // we should end up with 40 after this call
504 let result: RedisResult<i32> = con.xtrim("k1", StreamMaxlen::Equals(10));
505 assert_eq!(result, Ok(40));
506 }
507
508 #[test]
test_xgroup()509 fn test_xgroup() {
510 // Tests the following commands....
511 // xgroup_create_mkstream
512 // xgroup_destroy
513 // xgroup_delconsumer
514
515 let ctx = TestContext::new();
516 let mut con = ctx.connection();
517
518 // test xgroup create w/ mkstream @ 0
519 let result: RedisResult<String> = con.xgroup_create_mkstream("k1", "g1", "0");
520 assert_eq!(result.is_ok(), true);
521
522 // destroy this new stream group
523 let result: RedisResult<i32> = con.xgroup_destroy("k1", "g1");
524 assert_eq!(result, Ok(1));
525
526 // add some keys
527 xadd(&mut con);
528
529 // create the group again using an existing stream
530 let result: RedisResult<String> = con.xgroup_create("k1", "g1", "0");
531 assert_eq!(result.is_ok(), true);
532
533 // read from the group so we can register the consumer
534 let reply: StreamReadReply = con
535 .xread_options(
536 &["k1"],
537 &[">"],
538 StreamReadOptions::default().group("g1", "c1"),
539 )
540 .unwrap();
541 assert_eq!(reply.keys[0].ids.len(), 2);
542
543 let result: RedisResult<i32> = con.xgroup_delconsumer("k1", "g1", "c1");
544 // returns the number of pending message this client had open
545 assert_eq!(result, Ok(2));
546
547 let result: RedisResult<i32> = con.xgroup_destroy("k1", "g1");
548 assert_eq!(result, Ok(1));
549 }
550
551 #[test]
test_xrange()552 fn test_xrange() {
553 // Tests the following commands....
554 // xrange (-/+ variations)
555 // xrange_all
556 // xrange_count
557
558 let ctx = TestContext::new();
559 let mut con = ctx.connection();
560
561 xadd(&mut con);
562
563 // xrange replies
564 let reply: StreamRangeReply = con.xrange_all("k1").unwrap();
565 assert_eq!(reply.ids.len(), 2);
566
567 let reply: StreamRangeReply = con.xrange("k1", "1000-1", "+").unwrap();
568 assert_eq!(reply.ids.len(), 1);
569
570 let reply: StreamRangeReply = con.xrange("k1", "-", "1000-0").unwrap();
571 assert_eq!(reply.ids.len(), 1);
572
573 let reply: StreamRangeReply = con.xrange_count("k1", "-", "+", 1).unwrap();
574 assert_eq!(reply.ids.len(), 1);
575 }
576
577 #[test]
test_xrevrange()578 fn test_xrevrange() {
579 // Tests the following commands....
580 // xrevrange (+/- variations)
581 // xrevrange_all
582 // xrevrange_count
583
584 let ctx = TestContext::new();
585 let mut con = ctx.connection();
586
587 xadd(&mut con);
588
589 // xrange replies
590 let reply: StreamRangeReply = con.xrevrange_all("k1").unwrap();
591 assert_eq!(reply.ids.len(), 2);
592
593 let reply: StreamRangeReply = con.xrevrange("k1", "1000-1", "-").unwrap();
594 assert_eq!(reply.ids.len(), 2);
595
596 let reply: StreamRangeReply = con.xrevrange("k1", "+", "1000-1").unwrap();
597 assert_eq!(reply.ids.len(), 1);
598
599 let reply: StreamRangeReply = con.xrevrange_count("k1", "+", "-", 1).unwrap();
600 assert_eq!(reply.ids.len(), 1);
601 }
602