1import pytest 2import asyncio 3 4from collections import OrderedDict 5from unittest import mock 6 7from aioredis.commands.streams import parse_messages 8from aioredis.errors import BusyGroupError 9from _testutils import redis_version 10 11pytestmark = redis_version( 12 5, 0, 0, reason="Streams only available since Redis 5.0.0") 13 14 15async def add_message_with_sleep(redis, stream, fields): 16 await asyncio.sleep(0.2) 17 result = await redis.xadd(stream, fields) 18 return result 19 20 21async def test_xadd(redis, server_bin): 22 fields = OrderedDict(( 23 (b'field1', b'value1'), 24 (b'field2', b'value2'), 25 )) 26 message_id = await redis.xadd('test_stream', fields) 27 28 # Check the result is in the expected format (i.e: 1507400517949-0) 29 assert b'-' in message_id 30 timestamp, sequence = message_id.split(b'-') 31 assert timestamp.isdigit() 32 assert sequence.isdigit() 33 34 # Read it back 35 messages = await redis.xrange('test_stream') 36 assert len(messages) == 1 37 message = messages[0] 38 assert message[0] == message_id 39 assert message[1] == OrderedDict([ 40 (b'field1', b'value1'), 41 (b'field2', b'value2')] 42 ) 43 44 45async def test_xadd_maxlen_exact(redis, server_bin): 46 message_id1 = await redis.xadd('test_stream', {'f1': 'v1'}) # noqa 47 48 # Ensure the millisecond-based message ID increments 49 await asyncio.sleep(0.001) 50 message_id2 = await redis.xadd('test_stream', {'f2': 'v2'}) 51 await asyncio.sleep(0.001) 52 message_id3 = await redis.xadd('test_stream', {'f3': 'v3'}, 53 max_len=2, exact_len=True) 54 55 # Read it back 56 messages = await redis.xrange('test_stream') 57 assert len(messages) == 2 58 59 message2 = messages[0] 60 message3 = messages[1] 61 62 # The first message should no longer exist, just messages 63 # 2 and 3 remain 64 assert message2[0] == message_id2 65 assert message2[1] == OrderedDict([(b'f2', b'v2')]) 66 67 assert message3[0] == message_id3 68 assert message3[1] == OrderedDict([(b'f3', b'v3')]) 69 70 71async def test_xadd_manual_message_ids(redis, server_bin): 72 await redis.xadd('test_stream', {'f1': 'v1'}, message_id='1515958771000-0') 73 await redis.xadd('test_stream', {'f1': 'v1'}, message_id='1515958771000-1') 74 await redis.xadd('test_stream', {'f1': 'v1'}, message_id='1515958772000-0') 75 76 messages = await redis.xrange('test_stream') 77 message_ids = [message_id for message_id, _ in messages] 78 assert message_ids == [ 79 b'1515958771000-0', 80 b'1515958771000-1', 81 b'1515958772000-0' 82 ] 83 84 85async def test_xadd_maxlen_inexact(redis, server_bin): 86 await redis.xadd('test_stream', {'f1': 'v1'}) 87 # Ensure the millisecond-based message ID increments 88 await asyncio.sleep(0.001) 89 await redis.xadd('test_stream', {'f2': 'v2'}) 90 await asyncio.sleep(0.001) 91 await redis.xadd('test_stream', {'f3': 'v3'}, max_len=2, exact_len=False) 92 93 # Read it back 94 messages = await redis.xrange('test_stream') 95 # Redis will not have removed the whole node yet 96 assert len(messages) == 3 97 98 # Check the stream is eventually truncated 99 for x in range(0, 1000): 100 await redis.xadd('test_stream', {'f': 'v'}, max_len=2) 101 102 messages = await redis.xrange('test_stream') 103 assert len(messages) < 1000 104 105 106async def test_xrange(redis, server_bin): 107 stream = 'test_stream' 108 fields = OrderedDict(( 109 (b'field1', b'value1'), 110 (b'field2', b'value2'), 111 )) 112 message_id1 = await redis.xadd(stream, fields) 113 message_id2 = await redis.xadd(stream, fields) 114 message_id3 = await redis.xadd(stream, fields) # noqa 115 116 # Test no parameters 117 messages = await redis.xrange(stream) 118 assert len(messages) == 3 119 message = messages[0] 120 assert message[0] == message_id1 121 assert message[1] == OrderedDict([ 122 (b'field1', b'value1'), 123 (b'field2', b'value2')] 124 ) 125 126 # Test start 127 messages = await redis.xrange(stream, start=message_id2) 128 assert len(messages) == 2 129 130 messages = await redis.xrange(stream, start='9900000000000-0') 131 assert len(messages) == 0 132 133 # Test stop 134 messages = await redis.xrange(stream, stop='0000000000000-0') 135 assert len(messages) == 0 136 137 messages = await redis.xrange(stream, stop=message_id2) 138 assert len(messages) == 2 139 140 messages = await redis.xrange(stream, stop='9900000000000-0') 141 assert len(messages) == 3 142 143 # Test start & stop 144 messages = await redis.xrange(stream, 145 start=message_id1, 146 stop=message_id2) 147 assert len(messages) == 2 148 149 messages = await redis.xrange(stream, 150 start='0000000000000-0', 151 stop='9900000000000-0') 152 assert len(messages) == 3 153 154 # Test count 155 messages = await redis.xrange(stream, count=2) 156 assert len(messages) == 2 157 158 159async def test_xrevrange(redis, server_bin): 160 stream = 'test_stream' 161 fields = OrderedDict(( 162 (b'field1', b'value1'), 163 (b'field2', b'value2'), 164 )) 165 message_id1 = await redis.xadd(stream, fields) 166 message_id2 = await redis.xadd(stream, fields) 167 message_id3 = await redis.xadd(stream, fields) # noqa 168 169 # Test no parameters 170 messages = await redis.xrevrange(stream) 171 assert len(messages) == 3 172 message = messages[0] 173 assert message[0] == message_id3 174 assert message[1] == OrderedDict([ 175 (b'field1', b'value1'), 176 (b'field2', b'value2')] 177 ) 178 179 # Test start 180 messages = await redis.xrevrange(stream, start=message_id2) 181 assert len(messages) == 2 182 183 messages = await redis.xrevrange(stream, start='9900000000000-0') 184 assert len(messages) == 3 185 186 # Test stop 187 messages = await redis.xrevrange(stream, stop='0000000000000-0') 188 assert len(messages) == 3 189 190 messages = await redis.xrevrange(stream, stop=message_id2) 191 assert len(messages) == 2 192 193 messages = await redis.xrevrange(stream, stop='9900000000000-0') 194 assert len(messages) == 0 195 196 # Test start & stop 197 messages = await redis.xrevrange(stream, 198 start=message_id2, 199 stop=message_id1) 200 assert len(messages) == 2 201 202 messages = await redis.xrevrange(stream, 203 start='9900000000000-0', 204 stop='0000000000000-0') 205 assert len(messages) == 3 206 207 # Test count 208 messages = await redis.xrevrange(stream, count=2) 209 assert len(messages) == 2 210 211 212async def test_xread_selection(redis, server_bin): 213 """Test use of counts and starting IDs""" 214 stream = 'test_stream' 215 fields = OrderedDict(( 216 (b'field1', b'value1'), 217 (b'field2', b'value2'), 218 )) 219 message_id1 = await redis.xadd(stream, fields) 220 message_id2 = await redis.xadd(stream, fields) # noqa 221 message_id3 = await redis.xadd(stream, fields) 222 223 messages = await redis.xread([stream], 224 timeout=1, 225 latest_ids=['0000000000000-0']) 226 assert len(messages) == 3 227 228 messages = await redis.xread([stream], 229 timeout=1, 230 latest_ids=[message_id1]) 231 assert len(messages) == 2 232 233 messages = await redis.xread([stream], 234 timeout=1, 235 latest_ids=[message_id3]) 236 assert len(messages) == 0 237 238 messages = await redis.xread([stream], 239 timeout=1, 240 latest_ids=['0000000000000-0'], count=2) 241 assert len(messages) == 2 242 243 244async def test_xread_blocking(redis, create_redis, server, server_bin): 245 """Test the blocking read features""" 246 fields = OrderedDict(( 247 (b'field1', b'value1'), 248 (b'field2', b'value2'), 249 )) 250 other_redis = await create_redis( 251 server.tcp_address) 252 253 # create blocking task in separate connection 254 consumer = other_redis.xread(['test_stream'], timeout=1000) 255 256 producer_task = asyncio.Task( 257 add_message_with_sleep(redis, 'test_stream', fields)) 258 results = await asyncio.gather(consumer, producer_task) 259 260 received_messages, sent_message_id = results 261 assert len(received_messages) == 1 262 assert sent_message_id 263 264 received_stream, received_message_id, received_fields \ 265 = received_messages[0] 266 267 assert received_stream == b'test_stream' 268 assert sent_message_id == received_message_id 269 assert fields == received_fields 270 271 # Test that we get nothing back from an empty stream 272 results = await redis.xread(['another_stream'], timeout=100) 273 assert results == [] 274 275 other_redis.close() 276 277 278async def test_xgroup_create(redis, server_bin): 279 # Also tests xinfo_groups() 280 await redis.xadd('test_stream', {'a': 1}) 281 await redis.xgroup_create('test_stream', 'test_group') 282 info = await redis.xinfo_groups('test_stream') 283 assert info == [{ 284 b'name': b'test_group', 285 b'last-delivered-id': mock.ANY, 286 b'pending': 0, 287 b'consumers': 0 288 }] 289 290 291async def test_xgroup_create_mkstream(redis, server_bin): 292 await redis.xgroup_create('test_stream', 'test_group', mkstream=True) 293 info = await redis.xinfo_groups('test_stream') 294 assert info == [{ 295 b'name': b'test_group', 296 b'last-delivered-id': mock.ANY, 297 b'pending': 0, 298 b'consumers': 0 299 }] 300 301 302async def test_xgroup_create_already_exists(redis, server_bin): 303 await redis.xadd('test_stream', {'a': 1}) 304 await redis.xgroup_create('test_stream', 'test_group') 305 with pytest.raises(BusyGroupError): 306 await redis.xgroup_create('test_stream', 'test_group') 307 308 309async def test_xgroup_setid(redis, server_bin): 310 await redis.xadd('test_stream', {'a': 1}) 311 await redis.xgroup_create('test_stream', 'test_group') 312 await redis.xgroup_setid('test_stream', 'test_group', '$') 313 314 315async def test_xgroup_destroy(redis, server_bin): 316 await redis.xadd('test_stream', {'a': 1}) 317 await redis.xgroup_create('test_stream', 'test_group') 318 await redis.xgroup_destroy('test_stream', 'test_group') 319 info = await redis.xinfo_groups('test_stream') 320 assert not info 321 322 323async def test_xread_group(redis): 324 await redis.xadd('test_stream', {'a': 1}) 325 await redis.xgroup_create('test_stream', 'test_group', latest_id='0') 326 327 # read all pending messages 328 messages = await redis.xread_group( 329 'test_group', 'test_consumer', ['test_stream'], 330 timeout=1000, latest_ids=['>'] 331 ) 332 assert len(messages) == 1 333 stream, message_id, fields = messages[0] 334 assert stream == b'test_stream' 335 assert message_id 336 assert fields == {b'a': b'1'} 337 338 339async def test_xread_group_with_no_ack(redis): 340 await redis.xadd('test_stream', {'a': 1}) 341 await redis.xgroup_create('test_stream', 'test_group', latest_id='0') 342 343 # read all pending messages 344 messages = await redis.xread_group( 345 'test_group', 'test_consumer', ['test_stream'], 346 timeout=1000, latest_ids=['>'], no_ack=True 347 ) 348 assert len(messages) == 1 349 stream, message_id, fields = messages[0] 350 assert stream == b'test_stream' 351 assert message_id 352 assert fields == {b'a': b'1'} 353 354 355async def test_xack_and_xpending(redis): 356 # Test a full xread -> xack cycle, using xpending to check the status 357 message_id = await redis.xadd('test_stream', {'a': 1}) 358 await redis.xgroup_create('test_stream', 'test_group', latest_id='0') 359 360 # Nothing pending as we haven't claimed anything yet 361 pending_count, min_id, max_id, count = \ 362 await redis.xpending('test_stream', 'test_group') 363 assert pending_count == 0 364 365 # Read the message 366 await redis.xread_group( 367 'test_group', 'test_consumer', ['test_stream'], 368 timeout=1000, latest_ids=['>'] 369 ) 370 371 # It is now pending 372 pending_count, min_id, max_id, pel = \ 373 await redis.xpending('test_stream', 'test_group') 374 assert pending_count == 1 375 assert min_id == message_id 376 assert max_id == message_id 377 assert pel == [[b'test_consumer', b'1']] 378 379 # Acknowledge the message 380 await redis.xack('test_stream', 'test_group', message_id) 381 382 # It is no longer pending 383 pending_count, min_id, max_id, pel = \ 384 await redis.xpending('test_stream', 'test_group') 385 assert pending_count == 0 386 387 388async def test_xpending_get_messages(redis): 389 # Like test_xack_and_xpending(), but using the start/end xpending() 390 # params to get the messages 391 message_id = await redis.xadd('test_stream', {'a': 1}) 392 await redis.xgroup_create('test_stream', 'test_group', latest_id='0') 393 await redis.xread_group( 394 'test_group', 'test_consumer', ['test_stream'], 395 timeout=1000, latest_ids=['>'] 396 ) 397 await asyncio.sleep(0.05) 398 399 # It is now pending 400 response = await redis.xpending('test_stream', 'test_group', '-', '+', 10) 401 assert len(response) == 1 402 ( 403 message_id, consumer_name, 404 milliseconds_since_last_delivery, num_deliveries 405 ) = response[0] 406 407 assert message_id 408 assert consumer_name == b'test_consumer' 409 assert milliseconds_since_last_delivery >= 50 410 assert num_deliveries == 1 411 412 413async def test_xpending_start_of_zero(redis): 414 await redis.xadd('test_stream', {'a': 1}) 415 await redis.xgroup_create('test_stream', 'test_group', latest_id='0') 416 # Doesn't raise a value error 417 await redis.xpending('test_stream', 'test_group', 0, '+', 10) 418 419 420async def test_xclaim_simple(redis): 421 # Put a message in a pending state then reclaim it is XCLAIM 422 message_id = await redis.xadd('test_stream', {'a': 1}) 423 await redis.xgroup_create('test_stream', 'test_group', latest_id='0') 424 await redis.xread_group( 425 'test_group', 'test_consumer', ['test_stream'], 426 timeout=1000, latest_ids=['>'] 427 ) 428 429 # Message is now pending 430 pending_count, min_id, max_id, pel = \ 431 await redis.xpending('test_stream', 'test_group') 432 assert pending_count == 1 433 assert pel == [[b'test_consumer', b'1']] 434 435 # Now claim it for another consumer 436 result = await redis.xclaim('test_stream', 'test_group', 'new_consumer', 437 min_idle_time=0, id=message_id) 438 assert result 439 claimed_message_id, fields = result[0] 440 assert claimed_message_id == message_id 441 assert fields == {b'a': b'1'} 442 443 # Ok, no see how things look 444 pending_count, min_id, max_id, pel = \ 445 await redis.xpending('test_stream', 'test_group') 446 assert pending_count == 1 447 assert pel == [[b'new_consumer', b'1']] 448 449 450async def test_xclaim_min_idle_time_includes_messages(redis): 451 message_id = await redis.xadd('test_stream', {'a': 1}) 452 await redis.xgroup_create('test_stream', 'test_group', latest_id='0') 453 await redis.xread_group( 454 'test_group', 'test_consumer', ['test_stream'], 455 timeout=1000, latest_ids=['>'] 456 ) 457 458 # Message is now pending. Wait 100ms 459 await asyncio.sleep(0.1) 460 461 # Now reclaim any messages which have been idle for > 50ms 462 result = await redis.xclaim('test_stream', 'test_group', 'new_consumer', 463 min_idle_time=50, id=message_id) 464 assert result 465 466 467async def test_xclaim_min_idle_time_excludes_messages(redis): 468 message_id = await redis.xadd('test_stream', {'a': 1}) 469 await redis.xgroup_create('test_stream', 'test_group', latest_id='0') 470 await redis.xread_group( 471 'test_group', 'test_consumer', ['test_stream'], 472 timeout=1000, latest_ids=['>'] 473 ) 474 # Message is now pending. Wait no time at all 475 476 # Now reclaim any messages which have been idle for > 50ms 477 result = await redis.xclaim('test_stream', 'test_group', 'new_consumer', 478 min_idle_time=50, id=message_id) 479 # Nothing to claim 480 assert not result 481 482 483async def test_xgroup_delconsumer(redis, create_redis, server): 484 await redis.xadd('test_stream', {'a': 1}) 485 await redis.xgroup_create('test_stream', 'test_group') 486 487 # Note that consumers are only created once they read a message, 488 # not when they first connect. So make sure we consume from ID 0 489 # so we get the messages we just XADDed (above) 490 await redis.xread_group( 491 'test_group', 'test_consumer', 492 streams=['test_stream'], latest_ids=[0] 493 ) 494 495 response = await redis.xgroup_delconsumer( 496 'test_stream', 'test_group', 'test_consumer' 497 ) 498 assert response == 0 499 info = await redis.xinfo_consumers('test_stream', 'test_group') 500 assert not info 501 502 503async def test_xdel_stream(redis): 504 message_id = await redis.xadd('test_stream', {'a': 1}) 505 response = await redis.xdel('test_stream', id=message_id) 506 assert response >= 0 507 508 509async def test_xtrim_stream(redis): 510 await redis.xadd('test_stream', {'a': 1}) 511 await redis.xadd('test_stream', {'b': 1}) 512 await redis.xadd('test_stream', {'c': 1}) 513 response = await redis.xtrim('test_stream', max_len=1, exact_len=False) 514 assert response >= 0 515 516 517async def test_xlen_stream(redis): 518 await redis.xadd('test_stream', {'a': 1}) 519 response = await redis.xlen('test_stream') 520 assert response >= 0 521 522 523async def test_xinfo_consumers(redis): 524 await redis.xadd('test_stream', {'a': 1}) 525 await redis.xgroup_create('test_stream', 'test_group') 526 527 # Note that consumers are only created once they read a message, 528 # not when they first connect. So make sure we consume from ID 0 529 # so we get the messages we just XADDed (above) 530 await redis.xread_group( 531 'test_group', 'test_consumer', 532 streams=['test_stream'], latest_ids=[0] 533 ) 534 535 info = await redis.xinfo_consumers('test_stream', 'test_group') 536 assert info 537 assert isinstance(info[0], dict) 538 539 540async def test_xinfo_stream(redis): 541 await redis.xadd('test_stream', {'a': 1}) 542 await redis.xgroup_create('test_stream', 'test_group') 543 544 # Note that consumers are only created once they read a message, 545 # not when they first connect. So make sure we consume from ID 0 546 # so we get the messages we just XADDed (above) 547 await redis.xread_group( 548 'test_group', 'test_consumer', 549 streams=['test_stream'], latest_ids=[0] 550 ) 551 552 info = await redis.xinfo_stream('test_stream') 553 assert info 554 assert isinstance(info, dict) 555 556 info = await redis.xinfo('test_stream') 557 assert info 558 assert isinstance(info, dict) 559 560 561async def test_xinfo_help(redis): 562 info = await redis.xinfo_help() 563 assert info 564 565 566@pytest.mark.parametrize('param', [0.1, '1']) 567async def test_xread_param_types(redis, param): 568 with pytest.raises(TypeError): 569 await redis.xread( 570 ["system_event_stream"], 571 timeout=param, latest_ids=[0] 572 ) 573 574 575def test_parse_messages_ok(): 576 message = [(b'123', [b'f1', b'v1', b'f2', b'v2'])] 577 assert parse_messages(message) == [(b'123', {b'f1': b'v1', b'f2': b'v2'})] 578 579 580def test_parse_messages_null_fields(): 581 # Redis can sometimes respond with a fields value of 'null', 582 # so ensure we handle that sensibly 583 message = [(b'123', None)] 584 assert parse_messages(message) == [] 585 586 587def test_parse_messages_null_message(): 588 # Redis can sometimes respond with a fields value of 'null', 589 # so ensure we handle that sensibly 590 message = [None] 591 assert parse_messages(message) == [] 592