1 #![cfg(feature = "streams")]
2 
3 use redis::streams::*;
4 use redis::{Commands, Connection, RedisResult, ToRedisArgs};
5 
6 mod support;
7 use crate::support::*;
8 
9 use std::collections::BTreeMap;
10 use std::str;
11 use std::thread::sleep;
12 use std::time::Duration;
13 
14 macro_rules! assert_args {
15     ($value:expr, $($args:expr),+) => {
16         let args = $value.to_redis_args();
17         let strings: Vec<_> = args.iter()
18                                 .map(|a| str::from_utf8(a.as_ref()).unwrap())
19                                 .collect();
20         assert_eq!(strings, vec![$($args),+]);
21     }
22 }
23 
xadd(con: &mut Connection)24 fn xadd(con: &mut Connection) {
25     let _: RedisResult<String> =
26         con.xadd("k1", "1000-0", &[("hello", "world"), ("redis", "streams")]);
27     let _: RedisResult<String> = con.xadd("k1", "1000-1", &[("hello", "world2")]);
28     let _: RedisResult<String> = con.xadd("k2", "2000-0", &[("hello", "world")]);
29     let _: RedisResult<String> = con.xadd("k2", "2000-1", &[("hello", "world2")]);
30 }
31 
xadd_keyrange(con: &mut Connection, key: &str, start: i32, end: i32)32 fn xadd_keyrange(con: &mut Connection, key: &str, start: i32, end: i32) {
33     for _i in start..end {
34         let _: RedisResult<String> = con.xadd(key, "*", &[("h", "w")]);
35     }
36 }
37 
38 #[test]
test_cmd_options()39 fn test_cmd_options() {
40     // Tests the following command option builders....
41     // xclaim_options
42     // xread_options
43     // maxlen enum
44 
45     // test read options
46 
47     let empty = StreamClaimOptions::default();
48     assert_eq!(ToRedisArgs::to_redis_args(&empty).len(), 0);
49 
50     let empty = StreamReadOptions::default();
51     assert_eq!(ToRedisArgs::to_redis_args(&empty).len(), 0);
52 
53     let opts = StreamClaimOptions::default()
54         .idle(50)
55         .time(500)
56         .retry(3)
57         .with_force()
58         .with_justid();
59 
60     assert_args!(
61         &opts,
62         "IDLE",
63         "50",
64         "TIME",
65         "500",
66         "RETRYCOUNT",
67         "3",
68         "FORCE",
69         "JUSTID"
70     );
71 
72     // test maxlen options
73 
74     assert_args!(StreamMaxlen::Approx(10), "MAXLEN", "~", "10");
75     assert_args!(StreamMaxlen::Equals(10), "MAXLEN", "=", "10");
76 
77     // test read options
78 
79     let opts = StreamReadOptions::default()
80         .noack()
81         .block(100)
82         .count(200)
83         .group("group-name", "consumer-name");
84 
85     assert_args!(
86         &opts,
87         "BLOCK",
88         "100",
89         "COUNT",
90         "200",
91         "NOACK",
92         "GROUP",
93         "group-name",
94         "consumer-name"
95     );
96 
97     // should skip noack because of missing group(,)
98     let opts = StreamReadOptions::default().noack().block(100).count(200);
99 
100     assert_args!(&opts, "BLOCK", "100", "COUNT", "200");
101 }
102 
103 #[test]
test_assorted_1()104 fn test_assorted_1() {
105     // Tests the following commands....
106     // xadd
107     // xadd_map (skip this for now)
108     // xadd_maxlen
109     // xread
110     // xlen
111 
112     let ctx = TestContext::new();
113     let mut con = ctx.connection();
114 
115     xadd(&mut con);
116 
117     // smoke test that we get the same id back
118     let result: RedisResult<String> = con.xadd("k0", "1000-0", &[("x", "y")]);
119     assert_eq!(result.unwrap(), "1000-0");
120 
121     // xread reply
122     let reply: StreamReadReply = con.xread(&["k1", "k2", "k3"], &["0", "0", "0"]).unwrap();
123 
124     // verify reply contains 2 keys even though we asked for 3
125     assert_eq!(&reply.keys.len(), &2usize);
126 
127     // verify first key & first id exist
128     assert_eq!(&reply.keys[0].key, "k1");
129     assert_eq!(&reply.keys[0].ids.len(), &2usize);
130     assert_eq!(&reply.keys[0].ids[0].id, "1000-0");
131 
132     // lookup the key in StreamId map
133     let hello: Option<String> = reply.keys[0].ids[0].get("hello");
134     assert_eq!(hello, Some("world".to_string()));
135 
136     // verify the second key was written
137     assert_eq!(&reply.keys[1].key, "k2");
138     assert_eq!(&reply.keys[1].ids.len(), &2usize);
139     assert_eq!(&reply.keys[1].ids[0].id, "2000-0");
140 
141     // test xadd_map
142     let mut map: BTreeMap<&str, &str> = BTreeMap::new();
143     map.insert("ab", "cd");
144     map.insert("ef", "gh");
145     map.insert("ij", "kl");
146     let _: RedisResult<String> = con.xadd_map("k3", "3000-0", map);
147 
148     let reply: StreamRangeReply = con.xrange_all("k3").unwrap();
149     assert_eq!(reply.ids[0].contains_key(&"ab"), true);
150     assert_eq!(reply.ids[0].contains_key(&"ef"), true);
151     assert_eq!(reply.ids[0].contains_key(&"ij"), true);
152 
153     // test xadd w/ maxlength below...
154 
155     // add 100 things to k4
156     xadd_keyrange(&mut con, "k4", 0, 100);
157 
158     // test xlen.. should have 100 items
159     let result: RedisResult<usize> = con.xlen("k4");
160     assert_eq!(result, Ok(100));
161 
162     // test xadd_maxlen
163     let _: RedisResult<String> =
164         con.xadd_maxlen("k4", StreamMaxlen::Equals(10), "*", &[("h", "w")]);
165     let result: RedisResult<usize> = con.xlen("k4");
166     assert_eq!(result, Ok(10));
167 }
168 
169 #[test]
test_xgroup_create()170 fn test_xgroup_create() {
171     // Tests the following commands....
172     // xadd
173     // xinfo_stream
174     // xgroup_create
175     // xinfo_groups
176 
177     let ctx = TestContext::new();
178     let mut con = ctx.connection();
179 
180     xadd(&mut con);
181 
182     // no key exists... this call breaks the connection pipe for some reason
183     let reply: RedisResult<StreamInfoStreamReply> = con.xinfo_stream("k10");
184     assert_eq!(reply.is_err(), true);
185 
186     // redo the connection because the above error
187     con = ctx.connection();
188 
189     // key should exist
190     let reply: StreamInfoStreamReply = con.xinfo_stream("k1").unwrap();
191     assert_eq!(&reply.first_entry.id, "1000-0");
192     assert_eq!(&reply.last_entry.id, "1000-1");
193     assert_eq!(&reply.last_generated_id, "1000-1");
194 
195     // xgroup create (existing stream)
196     let result: RedisResult<String> = con.xgroup_create("k1", "g1", "$");
197     assert_eq!(result.is_ok(), true);
198 
199     // xinfo groups (existing stream)
200     let result: RedisResult<StreamInfoGroupsReply> = con.xinfo_groups("k1");
201     assert_eq!(result.is_ok(), true);
202     let reply = result.unwrap();
203     assert_eq!(&reply.groups.len(), &1);
204     assert_eq!(&reply.groups[0].name, &"g1");
205 }
206 
207 #[test]
test_assorted_2()208 fn test_assorted_2() {
209     // Tests the following commands....
210     // xadd
211     // xinfo_stream
212     // xinfo_groups
213     // xinfo_consumer
214     // xgroup_create_mkstream
215     // xread_options
216     // xack
217     // xpending
218     // xpending_count
219     // xpending_consumer_count
220 
221     let ctx = TestContext::new();
222     let mut con = ctx.connection();
223 
224     xadd(&mut con);
225 
226     // test xgroup create w/ mkstream @ 0
227     let result: RedisResult<String> = con.xgroup_create_mkstream("k99", "g99", "0");
228     assert_eq!(result.is_ok(), true);
229 
230     // Since nothing exists on this stream yet,
231     // it should have the defaults returned by the client
232     let result: RedisResult<StreamInfoGroupsReply> = con.xinfo_groups("k99");
233     assert_eq!(result.is_ok(), true);
234     let reply = result.unwrap();
235     assert_eq!(&reply.groups.len(), &1);
236     assert_eq!(&reply.groups[0].name, &"g99");
237     assert_eq!(&reply.groups[0].last_delivered_id, &"0-0");
238 
239     // call xadd on k99 just so we can read from it
240     // using consumer g99 and test xinfo_consumers
241     let _: RedisResult<String> = con.xadd("k99", "1000-0", &[("a", "b"), ("c", "d")]);
242     let _: RedisResult<String> = con.xadd("k99", "1000-1", &[("e", "f"), ("g", "h")]);
243 
244     // test empty PEL
245     let empty_reply: StreamPendingReply = con.xpending("k99", "g99").unwrap();
246 
247     assert_eq!(empty_reply.count(), 0);
248     if let StreamPendingReply::Empty = empty_reply {
249         // looks good
250     } else {
251         panic!("Expected StreamPendingReply::Empty but got Data");
252     }
253 
254     // passing options  w/ group triggers XREADGROUP
255     // using ID=">" means all undelivered ids
256     // otherwise, ID="0 | ms-num" means all pending already
257     // sent to this client
258     let reply: StreamReadReply = con
259         .xread_options(
260             &["k99"],
261             &[">"],
262             StreamReadOptions::default().group("g99", "c99"),
263         )
264         .unwrap();
265     assert_eq!(reply.keys[0].ids.len(), 2);
266 
267     // read xinfo consumers again, should have 2 messages for the c99 consumer
268     let reply: StreamInfoConsumersReply = con.xinfo_consumers("k99", "g99").unwrap();
269     assert_eq!(reply.consumers[0].pending, 2);
270 
271     // ack one of these messages
272     let result: RedisResult<i32> = con.xack("k99", "g99", &["1000-0"]);
273     assert_eq!(result, Ok(1));
274 
275     // get pending messages already seen by this client
276     // we should only have one now..
277     let reply: StreamReadReply = con
278         .xread_options(
279             &["k99"],
280             &["0"],
281             StreamReadOptions::default().group("g99", "c99"),
282         )
283         .unwrap();
284     assert_eq!(reply.keys.len(), 1);
285 
286     // we should also have one pending here...
287     let reply: StreamInfoConsumersReply = con.xinfo_consumers("k99", "g99").unwrap();
288     assert_eq!(reply.consumers[0].pending, 1);
289 
290     // add more and read so we can test xpending
291     let _: RedisResult<String> = con.xadd("k99", "1001-0", &[("i", "j"), ("k", "l")]);
292     let _: RedisResult<String> = con.xadd("k99", "1001-1", &[("m", "n"), ("o", "p")]);
293     let _: StreamReadReply = con
294         .xread_options(
295             &["k99"],
296             &[">"],
297             StreamReadOptions::default().group("g99", "c99"),
298         )
299         .unwrap();
300 
301     // call xpending here...
302     // this has a different reply from what the count variations return
303     let data_reply: StreamPendingReply = con.xpending("k99", "g99").unwrap();
304 
305     assert_eq!(data_reply.count(), 3);
306 
307     if let StreamPendingReply::Data(data) = data_reply {
308         assert_stream_pending_data(data)
309     } else {
310         panic!("Expected StreamPendingReply::Data but got Empty");
311     }
312 
313     // both count variations have the same reply types
314     let reply: StreamPendingCountReply = con.xpending_count("k99", "g99", "-", "+", 10).unwrap();
315     assert_eq!(reply.ids.len(), 3);
316 
317     let reply: StreamPendingCountReply = con
318         .xpending_consumer_count("k99", "g99", "-", "+", 10, "c99")
319         .unwrap();
320     assert_eq!(reply.ids.len(), 3);
321 
322     for StreamPendingId {
323         id,
324         consumer,
325         times_delivered,
326         last_delivered_ms: _,
327     } in reply.ids
328     {
329         assert!(!id.is_empty());
330         assert!(!consumer.is_empty());
331         assert!(times_delivered > 0);
332     }
333 }
334 
assert_stream_pending_data(data: StreamPendingData)335 fn assert_stream_pending_data(data: StreamPendingData) {
336     assert_eq!(data.start_id, "1000-1");
337     assert_eq!(data.end_id, "1001-1");
338     assert_eq!(data.consumers.len(), 1);
339     assert_eq!(data.consumers[0].name, "c99");
340 }
341 
342 #[test]
test_xadd_maxlen_map()343 fn test_xadd_maxlen_map() {
344     let ctx = TestContext::new();
345     let mut con = ctx.connection();
346 
347     for i in 0..10 {
348         let mut map: BTreeMap<&str, &str> = BTreeMap::new();
349         let idx = i.to_string();
350         map.insert("idx", &idx);
351         let _: RedisResult<String> =
352             con.xadd_maxlen_map("maxlen_map", StreamMaxlen::Equals(3), "*", map);
353     }
354 
355     let result: RedisResult<usize> = con.xlen("maxlen_map");
356     assert_eq!(result, Ok(3));
357     let reply: StreamRangeReply = con.xrange_all("maxlen_map").unwrap();
358 
359     assert_eq!(reply.ids[0].get("idx"), Some("7".to_string()));
360     assert_eq!(reply.ids[1].get("idx"), Some("8".to_string()));
361     assert_eq!(reply.ids[2].get("idx"), Some("9".to_string()));
362 }
363 
364 #[test]
test_xclaim()365 fn test_xclaim() {
366     // Tests the following commands....
367     // xclaim
368     // xclaim_options
369     let ctx = TestContext::new();
370     let mut con = ctx.connection();
371 
372     // xclaim test basic idea:
373     // 1. we need to test adding messages to a group
374     // 2. then xreadgroup needs to define a consumer and read pending
375     //    messages without acking them
376     // 3. then we need to sleep 5ms and call xpending
377     // 4. from here we should be able to claim message
378     //    past the idle time and read them from a different consumer
379 
380     // create the group
381     let result: RedisResult<String> = con.xgroup_create_mkstream("k1", "g1", "$");
382     assert_eq!(result.is_ok(), true);
383 
384     // add some keys
385     xadd_keyrange(&mut con, "k1", 0, 10);
386 
387     // read the pending items for this key & group
388     let reply: StreamReadReply = con
389         .xread_options(
390             &["k1"],
391             &[">"],
392             StreamReadOptions::default().group("g1", "c1"),
393         )
394         .unwrap();
395     // verify we have 10 ids
396     assert_eq!(reply.keys[0].ids.len(), 10);
397 
398     // save this StreamId for later
399     let claim = &reply.keys[0].ids[0];
400     let _claim_1 = &reply.keys[0].ids[1];
401     let claim_justids = &reply.keys[0]
402         .ids
403         .iter()
404         .map(|msg| &msg.id)
405         .collect::<Vec<&String>>();
406 
407     // sleep for 5ms
408     sleep(Duration::from_millis(5));
409 
410     // grab this id if > 4ms
411     let reply: StreamClaimReply = con
412         .xclaim("k1", "g1", "c2", 4, &[claim.id.clone()])
413         .unwrap();
414     assert_eq!(reply.ids.len(), 1);
415     assert_eq!(reply.ids[0].id, claim.id);
416 
417     // grab all pending ids for this key...
418     // we should 9 in c1 and 1 in c2
419     let reply: StreamPendingReply = con.xpending("k1", "g1").unwrap();
420     if let StreamPendingReply::Data(data) = reply {
421         assert_eq!(data.consumers[0].name, "c1");
422         assert_eq!(data.consumers[0].pending, 9);
423         assert_eq!(data.consumers[1].name, "c2");
424         assert_eq!(data.consumers[1].pending, 1);
425     }
426 
427     // sleep for 5ms
428     sleep(Duration::from_millis(5));
429 
430     // lets test some of the xclaim_options
431     // call force on the same claim.id
432     let _: StreamClaimReply = con
433         .xclaim_options(
434             "k1",
435             "g1",
436             "c3",
437             4,
438             &[claim.id.clone()],
439             StreamClaimOptions::default().with_force(),
440         )
441         .unwrap();
442 
443     let reply: StreamPendingReply = con.xpending("k1", "g1").unwrap();
444     // we should have 9 w/ c1 and 1 w/ c3 now
445     if let StreamPendingReply::Data(data) = reply {
446         assert_eq!(data.consumers[1].name, "c3");
447         assert_eq!(data.consumers[1].pending, 1);
448     }
449 
450     // sleep for 5ms
451     sleep(Duration::from_millis(5));
452 
453     // claim and only return JUSTID
454     let claimed: Vec<String> = con
455         .xclaim_options(
456             "k1",
457             "g1",
458             "c5",
459             4,
460             &claim_justids,
461             StreamClaimOptions::default().with_force().with_justid(),
462         )
463         .unwrap();
464     // we just claimed the original 10 ids
465     // and only returned the ids
466     assert_eq!(claimed.len(), 10);
467 }
468 
469 #[test]
test_xdel()470 fn test_xdel() {
471     // Tests the following commands....
472     // xdel
473     let ctx = TestContext::new();
474     let mut con = ctx.connection();
475 
476     // add some keys
477     xadd(&mut con);
478 
479     // delete the first stream item for this key
480     let result: RedisResult<i32> = con.xdel("k1", &["1000-0"]);
481     // returns the number of items deleted
482     assert_eq!(result, Ok(1));
483 
484     let result: RedisResult<i32> = con.xdel("k2", &["2000-0", "2000-1", "2000-2"]);
485     // should equal 2 since the last id doesn't exist
486     assert_eq!(result, Ok(2));
487 }
488 
489 #[test]
test_xtrim()490 fn test_xtrim() {
491     // Tests the following commands....
492     // xtrim
493     let ctx = TestContext::new();
494     let mut con = ctx.connection();
495 
496     // add some keys
497     xadd_keyrange(&mut con, "k1", 0, 100);
498 
499     // trim key to 50
500     // returns the number of items remaining in the stream
501     let result: RedisResult<i32> = con.xtrim("k1", StreamMaxlen::Equals(50));
502     assert_eq!(result, Ok(50));
503     // we should end up with 40 after this call
504     let result: RedisResult<i32> = con.xtrim("k1", StreamMaxlen::Equals(10));
505     assert_eq!(result, Ok(40));
506 }
507 
508 #[test]
test_xgroup()509 fn test_xgroup() {
510     // Tests the following commands....
511     // xgroup_create_mkstream
512     // xgroup_destroy
513     // xgroup_delconsumer
514 
515     let ctx = TestContext::new();
516     let mut con = ctx.connection();
517 
518     // test xgroup create w/ mkstream @ 0
519     let result: RedisResult<String> = con.xgroup_create_mkstream("k1", "g1", "0");
520     assert_eq!(result.is_ok(), true);
521 
522     // destroy this new stream group
523     let result: RedisResult<i32> = con.xgroup_destroy("k1", "g1");
524     assert_eq!(result, Ok(1));
525 
526     // add some keys
527     xadd(&mut con);
528 
529     // create the group again using an existing stream
530     let result: RedisResult<String> = con.xgroup_create("k1", "g1", "0");
531     assert_eq!(result.is_ok(), true);
532 
533     // read from the group so we can register the consumer
534     let reply: StreamReadReply = con
535         .xread_options(
536             &["k1"],
537             &[">"],
538             StreamReadOptions::default().group("g1", "c1"),
539         )
540         .unwrap();
541     assert_eq!(reply.keys[0].ids.len(), 2);
542 
543     let result: RedisResult<i32> = con.xgroup_delconsumer("k1", "g1", "c1");
544     // returns the number of pending message this client had open
545     assert_eq!(result, Ok(2));
546 
547     let result: RedisResult<i32> = con.xgroup_destroy("k1", "g1");
548     assert_eq!(result, Ok(1));
549 }
550 
551 #[test]
test_xrange()552 fn test_xrange() {
553     // Tests the following commands....
554     // xrange (-/+ variations)
555     // xrange_all
556     // xrange_count
557 
558     let ctx = TestContext::new();
559     let mut con = ctx.connection();
560 
561     xadd(&mut con);
562 
563     // xrange replies
564     let reply: StreamRangeReply = con.xrange_all("k1").unwrap();
565     assert_eq!(reply.ids.len(), 2);
566 
567     let reply: StreamRangeReply = con.xrange("k1", "1000-1", "+").unwrap();
568     assert_eq!(reply.ids.len(), 1);
569 
570     let reply: StreamRangeReply = con.xrange("k1", "-", "1000-0").unwrap();
571     assert_eq!(reply.ids.len(), 1);
572 
573     let reply: StreamRangeReply = con.xrange_count("k1", "-", "+", 1).unwrap();
574     assert_eq!(reply.ids.len(), 1);
575 }
576 
577 #[test]
test_xrevrange()578 fn test_xrevrange() {
579     // Tests the following commands....
580     // xrevrange (+/- variations)
581     // xrevrange_all
582     // xrevrange_count
583 
584     let ctx = TestContext::new();
585     let mut con = ctx.connection();
586 
587     xadd(&mut con);
588 
589     // xrange replies
590     let reply: StreamRangeReply = con.xrevrange_all("k1").unwrap();
591     assert_eq!(reply.ids.len(), 2);
592 
593     let reply: StreamRangeReply = con.xrevrange("k1", "1000-1", "-").unwrap();
594     assert_eq!(reply.ids.len(), 2);
595 
596     let reply: StreamRangeReply = con.xrevrange("k1", "+", "1000-1").unwrap();
597     assert_eq!(reply.ids.len(), 1);
598 
599     let reply: StreamRangeReply = con.xrevrange_count("k1", "+", "-", 1).unwrap();
600     assert_eq!(reply.ids.len(), 1);
601 }
602