1import pytest
2import asyncio
3
4from collections import OrderedDict
5from unittest import mock
6
7from aioredis.commands.streams import parse_messages
8from aioredis.errors import BusyGroupError
9from _testutils import redis_version
10
11pytestmark = redis_version(
12    5, 0, 0, reason="Streams only available since Redis 5.0.0")
13
14
15async def add_message_with_sleep(redis, stream, fields):
16    await asyncio.sleep(0.2)
17    result = await redis.xadd(stream, fields)
18    return result
19
20
21async def test_xadd(redis, server_bin):
22    fields = OrderedDict((
23        (b'field1', b'value1'),
24        (b'field2', b'value2'),
25    ))
26    message_id = await redis.xadd('test_stream', fields)
27
28    # Check the result is in the expected format (i.e: 1507400517949-0)
29    assert b'-' in message_id
30    timestamp, sequence = message_id.split(b'-')
31    assert timestamp.isdigit()
32    assert sequence.isdigit()
33
34    # Read it back
35    messages = await redis.xrange('test_stream')
36    assert len(messages) == 1
37    message = messages[0]
38    assert message[0] == message_id
39    assert message[1] == OrderedDict([
40        (b'field1', b'value1'),
41        (b'field2', b'value2')]
42    )
43
44
45async def test_xadd_maxlen_exact(redis, server_bin):
46    message_id1 = await redis.xadd('test_stream', {'f1': 'v1'})  # noqa
47
48    # Ensure the millisecond-based message ID increments
49    await asyncio.sleep(0.001)
50    message_id2 = await redis.xadd('test_stream', {'f2': 'v2'})
51    await asyncio.sleep(0.001)
52    message_id3 = await redis.xadd('test_stream', {'f3': 'v3'},
53                                   max_len=2, exact_len=True)
54
55    # Read it back
56    messages = await redis.xrange('test_stream')
57    assert len(messages) == 2
58
59    message2 = messages[0]
60    message3 = messages[1]
61
62    # The first message should no longer exist, just messages
63    # 2 and 3 remain
64    assert message2[0] == message_id2
65    assert message2[1] == OrderedDict([(b'f2', b'v2')])
66
67    assert message3[0] == message_id3
68    assert message3[1] == OrderedDict([(b'f3', b'v3')])
69
70
71async def test_xadd_manual_message_ids(redis, server_bin):
72    await redis.xadd('test_stream', {'f1': 'v1'}, message_id='1515958771000-0')
73    await redis.xadd('test_stream', {'f1': 'v1'}, message_id='1515958771000-1')
74    await redis.xadd('test_stream', {'f1': 'v1'}, message_id='1515958772000-0')
75
76    messages = await redis.xrange('test_stream')
77    message_ids = [message_id for message_id, _ in messages]
78    assert message_ids == [
79        b'1515958771000-0',
80        b'1515958771000-1',
81        b'1515958772000-0'
82    ]
83
84
85async def test_xadd_maxlen_inexact(redis, server_bin):
86    await redis.xadd('test_stream', {'f1': 'v1'})
87    # Ensure the millisecond-based message ID increments
88    await asyncio.sleep(0.001)
89    await redis.xadd('test_stream', {'f2': 'v2'})
90    await asyncio.sleep(0.001)
91    await redis.xadd('test_stream', {'f3': 'v3'}, max_len=2, exact_len=False)
92
93    # Read it back
94    messages = await redis.xrange('test_stream')
95    # Redis will not have removed the whole node yet
96    assert len(messages) == 3
97
98    # Check the stream is eventually truncated
99    for x in range(0, 1000):
100        await redis.xadd('test_stream', {'f': 'v'}, max_len=2)
101
102    messages = await redis.xrange('test_stream')
103    assert len(messages) < 1000
104
105
106async def test_xrange(redis, server_bin):
107    stream = 'test_stream'
108    fields = OrderedDict((
109        (b'field1', b'value1'),
110        (b'field2', b'value2'),
111    ))
112    message_id1 = await redis.xadd(stream, fields)
113    message_id2 = await redis.xadd(stream, fields)
114    message_id3 = await redis.xadd(stream, fields)  # noqa
115
116    # Test no parameters
117    messages = await redis.xrange(stream)
118    assert len(messages) == 3
119    message = messages[0]
120    assert message[0] == message_id1
121    assert message[1] == OrderedDict([
122        (b'field1', b'value1'),
123        (b'field2', b'value2')]
124    )
125
126    # Test start
127    messages = await redis.xrange(stream, start=message_id2)
128    assert len(messages) == 2
129
130    messages = await redis.xrange(stream, start='9900000000000-0')
131    assert len(messages) == 0
132
133    # Test stop
134    messages = await redis.xrange(stream, stop='0000000000000-0')
135    assert len(messages) == 0
136
137    messages = await redis.xrange(stream, stop=message_id2)
138    assert len(messages) == 2
139
140    messages = await redis.xrange(stream, stop='9900000000000-0')
141    assert len(messages) == 3
142
143    # Test start & stop
144    messages = await redis.xrange(stream,
145                                  start=message_id1,
146                                  stop=message_id2)
147    assert len(messages) == 2
148
149    messages = await redis.xrange(stream,
150                                  start='0000000000000-0',
151                                  stop='9900000000000-0')
152    assert len(messages) == 3
153
154    # Test count
155    messages = await redis.xrange(stream, count=2)
156    assert len(messages) == 2
157
158
159async def test_xrevrange(redis, server_bin):
160    stream = 'test_stream'
161    fields = OrderedDict((
162        (b'field1', b'value1'),
163        (b'field2', b'value2'),
164    ))
165    message_id1 = await redis.xadd(stream, fields)
166    message_id2 = await redis.xadd(stream, fields)
167    message_id3 = await redis.xadd(stream, fields)  # noqa
168
169    # Test no parameters
170    messages = await redis.xrevrange(stream)
171    assert len(messages) == 3
172    message = messages[0]
173    assert message[0] == message_id3
174    assert message[1] == OrderedDict([
175        (b'field1', b'value1'),
176        (b'field2', b'value2')]
177    )
178
179    # Test start
180    messages = await redis.xrevrange(stream, start=message_id2)
181    assert len(messages) == 2
182
183    messages = await redis.xrevrange(stream, start='9900000000000-0')
184    assert len(messages) == 3
185
186    # Test stop
187    messages = await redis.xrevrange(stream, stop='0000000000000-0')
188    assert len(messages) == 3
189
190    messages = await redis.xrevrange(stream, stop=message_id2)
191    assert len(messages) == 2
192
193    messages = await redis.xrevrange(stream, stop='9900000000000-0')
194    assert len(messages) == 0
195
196    # Test start & stop
197    messages = await redis.xrevrange(stream,
198                                     start=message_id2,
199                                     stop=message_id1)
200    assert len(messages) == 2
201
202    messages = await redis.xrevrange(stream,
203                                     start='9900000000000-0',
204                                     stop='0000000000000-0')
205    assert len(messages) == 3
206
207    # Test count
208    messages = await redis.xrevrange(stream, count=2)
209    assert len(messages) == 2
210
211
212async def test_xread_selection(redis, server_bin):
213    """Test use of counts and starting IDs"""
214    stream = 'test_stream'
215    fields = OrderedDict((
216        (b'field1', b'value1'),
217        (b'field2', b'value2'),
218    ))
219    message_id1 = await redis.xadd(stream, fields)
220    message_id2 = await redis.xadd(stream, fields)  # noqa
221    message_id3 = await redis.xadd(stream, fields)
222
223    messages = await redis.xread([stream],
224                                 timeout=1,
225                                 latest_ids=['0000000000000-0'])
226    assert len(messages) == 3
227
228    messages = await redis.xread([stream],
229                                 timeout=1,
230                                 latest_ids=[message_id1])
231    assert len(messages) == 2
232
233    messages = await redis.xread([stream],
234                                 timeout=1,
235                                 latest_ids=[message_id3])
236    assert len(messages) == 0
237
238    messages = await redis.xread([stream],
239                                 timeout=1,
240                                 latest_ids=['0000000000000-0'], count=2)
241    assert len(messages) == 2
242
243
244async def test_xread_blocking(redis, create_redis, server, server_bin):
245    """Test the blocking read features"""
246    fields = OrderedDict((
247        (b'field1', b'value1'),
248        (b'field2', b'value2'),
249    ))
250    other_redis = await create_redis(
251        server.tcp_address)
252
253    # create blocking task in separate connection
254    consumer = other_redis.xread(['test_stream'], timeout=1000)
255
256    producer_task = asyncio.Task(
257        add_message_with_sleep(redis, 'test_stream', fields))
258    results = await asyncio.gather(consumer, producer_task)
259
260    received_messages, sent_message_id = results
261    assert len(received_messages) == 1
262    assert sent_message_id
263
264    received_stream, received_message_id, received_fields \
265        = received_messages[0]
266
267    assert received_stream == b'test_stream'
268    assert sent_message_id == received_message_id
269    assert fields == received_fields
270
271    # Test that we get nothing back from an empty stream
272    results = await redis.xread(['another_stream'], timeout=100)
273    assert results == []
274
275    other_redis.close()
276
277
278async def test_xgroup_create(redis, server_bin):
279    # Also tests xinfo_groups()
280    await redis.xadd('test_stream', {'a': 1})
281    await redis.xgroup_create('test_stream', 'test_group')
282    info = await redis.xinfo_groups('test_stream')
283    assert info == [{
284        b'name': b'test_group',
285        b'last-delivered-id': mock.ANY,
286        b'pending': 0,
287        b'consumers': 0
288    }]
289
290
291async def test_xgroup_create_mkstream(redis, server_bin):
292    await redis.xgroup_create('test_stream', 'test_group', mkstream=True)
293    info = await redis.xinfo_groups('test_stream')
294    assert info == [{
295        b'name': b'test_group',
296        b'last-delivered-id': mock.ANY,
297        b'pending': 0,
298        b'consumers': 0
299    }]
300
301
302async def test_xgroup_create_already_exists(redis, server_bin):
303    await redis.xadd('test_stream', {'a': 1})
304    await redis.xgroup_create('test_stream', 'test_group')
305    with pytest.raises(BusyGroupError):
306        await redis.xgroup_create('test_stream', 'test_group')
307
308
309async def test_xgroup_setid(redis, server_bin):
310    await redis.xadd('test_stream', {'a': 1})
311    await redis.xgroup_create('test_stream', 'test_group')
312    await redis.xgroup_setid('test_stream', 'test_group', '$')
313
314
315async def test_xgroup_destroy(redis, server_bin):
316    await redis.xadd('test_stream', {'a': 1})
317    await redis.xgroup_create('test_stream', 'test_group')
318    await redis.xgroup_destroy('test_stream', 'test_group')
319    info = await redis.xinfo_groups('test_stream')
320    assert not info
321
322
323async def test_xread_group(redis):
324    await redis.xadd('test_stream', {'a': 1})
325    await redis.xgroup_create('test_stream', 'test_group', latest_id='0')
326
327    # read all pending messages
328    messages = await redis.xread_group(
329        'test_group', 'test_consumer', ['test_stream'],
330        timeout=1000, latest_ids=['>']
331    )
332    assert len(messages) == 1
333    stream, message_id, fields = messages[0]
334    assert stream == b'test_stream'
335    assert message_id
336    assert fields == {b'a': b'1'}
337
338
339async def test_xread_group_with_no_ack(redis):
340    await redis.xadd('test_stream', {'a': 1})
341    await redis.xgroup_create('test_stream', 'test_group', latest_id='0')
342
343    # read all pending messages
344    messages = await redis.xread_group(
345        'test_group', 'test_consumer', ['test_stream'],
346        timeout=1000, latest_ids=['>'], no_ack=True
347    )
348    assert len(messages) == 1
349    stream, message_id, fields = messages[0]
350    assert stream == b'test_stream'
351    assert message_id
352    assert fields == {b'a': b'1'}
353
354
355async def test_xack_and_xpending(redis):
356    # Test a full xread -> xack cycle, using xpending to check the status
357    message_id = await redis.xadd('test_stream', {'a': 1})
358    await redis.xgroup_create('test_stream', 'test_group', latest_id='0')
359
360    # Nothing pending as we haven't claimed anything yet
361    pending_count, min_id, max_id, count = \
362        await redis.xpending('test_stream', 'test_group')
363    assert pending_count == 0
364
365    # Read the message
366    await redis.xread_group(
367        'test_group', 'test_consumer', ['test_stream'],
368        timeout=1000, latest_ids=['>']
369    )
370
371    # It is now pending
372    pending_count, min_id, max_id, pel = \
373        await redis.xpending('test_stream', 'test_group')
374    assert pending_count == 1
375    assert min_id == message_id
376    assert max_id == message_id
377    assert pel == [[b'test_consumer', b'1']]
378
379    # Acknowledge the message
380    await redis.xack('test_stream', 'test_group', message_id)
381
382    # It is no longer pending
383    pending_count, min_id, max_id, pel = \
384        await redis.xpending('test_stream', 'test_group')
385    assert pending_count == 0
386
387
388async def test_xpending_get_messages(redis):
389    # Like test_xack_and_xpending(), but using the start/end xpending()
390    # params to get the messages
391    message_id = await redis.xadd('test_stream', {'a': 1})
392    await redis.xgroup_create('test_stream', 'test_group', latest_id='0')
393    await redis.xread_group(
394        'test_group', 'test_consumer', ['test_stream'],
395        timeout=1000, latest_ids=['>']
396    )
397    await asyncio.sleep(0.05)
398
399    # It is now pending
400    response = await redis.xpending('test_stream', 'test_group', '-', '+', 10)
401    assert len(response) == 1
402    (
403        message_id, consumer_name,
404        milliseconds_since_last_delivery, num_deliveries
405    ) = response[0]
406
407    assert message_id
408    assert consumer_name == b'test_consumer'
409    assert milliseconds_since_last_delivery >= 50
410    assert num_deliveries == 1
411
412
413async def test_xpending_start_of_zero(redis):
414    await redis.xadd('test_stream', {'a': 1})
415    await redis.xgroup_create('test_stream', 'test_group', latest_id='0')
416    # Doesn't raise a value error
417    await redis.xpending('test_stream', 'test_group', 0, '+', 10)
418
419
420async def test_xclaim_simple(redis):
421    # Put a message in a pending state then reclaim it is XCLAIM
422    message_id = await redis.xadd('test_stream', {'a': 1})
423    await redis.xgroup_create('test_stream', 'test_group', latest_id='0')
424    await redis.xread_group(
425        'test_group', 'test_consumer', ['test_stream'],
426        timeout=1000, latest_ids=['>']
427    )
428
429    # Message is now pending
430    pending_count, min_id, max_id, pel = \
431        await redis.xpending('test_stream', 'test_group')
432    assert pending_count == 1
433    assert pel == [[b'test_consumer', b'1']]
434
435    # Now claim it for another consumer
436    result = await redis.xclaim('test_stream', 'test_group', 'new_consumer',
437                                min_idle_time=0, id=message_id)
438    assert result
439    claimed_message_id, fields = result[0]
440    assert claimed_message_id == message_id
441    assert fields == {b'a': b'1'}
442
443    # Ok, no see how things look
444    pending_count, min_id, max_id, pel = \
445        await redis.xpending('test_stream', 'test_group')
446    assert pending_count == 1
447    assert pel == [[b'new_consumer', b'1']]
448
449
450async def test_xclaim_min_idle_time_includes_messages(redis):
451    message_id = await redis.xadd('test_stream', {'a': 1})
452    await redis.xgroup_create('test_stream', 'test_group', latest_id='0')
453    await redis.xread_group(
454        'test_group', 'test_consumer', ['test_stream'],
455        timeout=1000, latest_ids=['>']
456    )
457
458    # Message is now pending. Wait 100ms
459    await asyncio.sleep(0.1)
460
461    # Now reclaim any messages which have been idle for > 50ms
462    result = await redis.xclaim('test_stream', 'test_group', 'new_consumer',
463                                min_idle_time=50, id=message_id)
464    assert result
465
466
467async def test_xclaim_min_idle_time_excludes_messages(redis):
468    message_id = await redis.xadd('test_stream', {'a': 1})
469    await redis.xgroup_create('test_stream', 'test_group', latest_id='0')
470    await redis.xread_group(
471        'test_group', 'test_consumer', ['test_stream'],
472        timeout=1000, latest_ids=['>']
473    )
474    # Message is now pending. Wait no time at all
475
476    # Now reclaim any messages which have been idle for > 50ms
477    result = await redis.xclaim('test_stream', 'test_group', 'new_consumer',
478                                min_idle_time=50, id=message_id)
479    # Nothing to claim
480    assert not result
481
482
483async def test_xgroup_delconsumer(redis, create_redis, server):
484    await redis.xadd('test_stream', {'a': 1})
485    await redis.xgroup_create('test_stream', 'test_group')
486
487    # Note that consumers are only created once they read a message,
488    # not when they first connect. So make sure we consume from ID 0
489    # so we get the messages we just XADDed (above)
490    await redis.xread_group(
491        'test_group', 'test_consumer',
492        streams=['test_stream'], latest_ids=[0]
493    )
494
495    response = await redis.xgroup_delconsumer(
496        'test_stream', 'test_group', 'test_consumer'
497    )
498    assert response == 0
499    info = await redis.xinfo_consumers('test_stream', 'test_group')
500    assert not info
501
502
503async def test_xdel_stream(redis):
504    message_id = await redis.xadd('test_stream', {'a': 1})
505    response = await redis.xdel('test_stream', id=message_id)
506    assert response >= 0
507
508
509async def test_xtrim_stream(redis):
510    await redis.xadd('test_stream', {'a': 1})
511    await redis.xadd('test_stream', {'b': 1})
512    await redis.xadd('test_stream', {'c': 1})
513    response = await redis.xtrim('test_stream', max_len=1, exact_len=False)
514    assert response >= 0
515
516
517async def test_xlen_stream(redis):
518    await redis.xadd('test_stream', {'a': 1})
519    response = await redis.xlen('test_stream')
520    assert response >= 0
521
522
523async def test_xinfo_consumers(redis):
524    await redis.xadd('test_stream', {'a': 1})
525    await redis.xgroup_create('test_stream', 'test_group')
526
527    # Note that consumers are only created once they read a message,
528    # not when they first connect. So make sure we consume from ID 0
529    # so we get the messages we just XADDed (above)
530    await redis.xread_group(
531        'test_group', 'test_consumer',
532        streams=['test_stream'], latest_ids=[0]
533    )
534
535    info = await redis.xinfo_consumers('test_stream', 'test_group')
536    assert info
537    assert isinstance(info[0], dict)
538
539
540async def test_xinfo_stream(redis):
541    await redis.xadd('test_stream', {'a': 1})
542    await redis.xgroup_create('test_stream', 'test_group')
543
544    # Note that consumers are only created once they read a message,
545    # not when they first connect. So make sure we consume from ID 0
546    # so we get the messages we just XADDed (above)
547    await redis.xread_group(
548        'test_group', 'test_consumer',
549        streams=['test_stream'], latest_ids=[0]
550    )
551
552    info = await redis.xinfo_stream('test_stream')
553    assert info
554    assert isinstance(info, dict)
555
556    info = await redis.xinfo('test_stream')
557    assert info
558    assert isinstance(info, dict)
559
560
561async def test_xinfo_help(redis):
562    info = await redis.xinfo_help()
563    assert info
564
565
566@pytest.mark.parametrize('param', [0.1, '1'])
567async def test_xread_param_types(redis, param):
568    with pytest.raises(TypeError):
569        await redis.xread(
570            ["system_event_stream"],
571            timeout=param, latest_ids=[0]
572        )
573
574
575def test_parse_messages_ok():
576    message = [(b'123', [b'f1', b'v1', b'f2', b'v2'])]
577    assert parse_messages(message) == [(b'123', {b'f1': b'v1', b'f2': b'v2'})]
578
579
580def test_parse_messages_null_fields():
581    # Redis can sometimes respond with a fields value of 'null',
582    # so ensure we handle that sensibly
583    message = [(b'123', None)]
584    assert parse_messages(message) == []
585
586
587def test_parse_messages_null_message():
588    # Redis can sometimes respond with a fields value of 'null',
589    # so ensure we handle that sensibly
590    message = [None]
591    assert parse_messages(message) == []
592