1# pylint: skip-file 2from __future__ import absolute_import 3import time 4 5import pytest 6 7from kafka.client_async import KafkaClient 8from kafka.consumer.subscription_state import ( 9 SubscriptionState, ConsumerRebalanceListener) 10from kafka.coordinator.assignors.range import RangePartitionAssignor 11from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor 12from kafka.coordinator.base import Generation, MemberState, HeartbeatThread 13from kafka.coordinator.consumer import ConsumerCoordinator 14from kafka.coordinator.protocol import ( 15 ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) 16import kafka.errors as Errors 17from kafka.future import Future 18from kafka.metrics import Metrics 19from kafka.protocol.commit import ( 20 OffsetCommitRequest, OffsetCommitResponse, 21 OffsetFetchRequest, OffsetFetchResponse) 22from kafka.protocol.metadata import MetadataResponse 23from kafka.structs import TopicPartition, OffsetAndMetadata 24from kafka.util import WeakMethod 25 26 27@pytest.fixture 28def client(conn): 29 return KafkaClient(api_version=(0, 9)) 30 31@pytest.fixture 32def coordinator(client): 33 return ConsumerCoordinator(client, SubscriptionState(), Metrics()) 34 35 36def test_init(client, coordinator): 37 # metadata update on init 38 assert client.cluster._need_update is True 39 assert WeakMethod(coordinator._handle_metadata_update) in client.cluster._listeners 40 41 42@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) 43def test_autocommit_enable_api_version(client, api_version): 44 coordinator = ConsumerCoordinator(client, SubscriptionState(), 45 Metrics(), 46 enable_auto_commit=True, 47 session_timeout_ms=30000, # session_timeout_ms and max_poll_interval_ms 48 max_poll_interval_ms=30000, # should be the same to avoid KafkaConfigurationError 49 group_id='foobar', 50 api_version=api_version) 51 if api_version < (0, 8, 1): 52 assert coordinator.config['enable_auto_commit'] is False 53 else: 54 assert coordinator.config['enable_auto_commit'] is True 55 56 57def test_protocol_type(coordinator): 58 assert coordinator.protocol_type() is 'consumer' 59 60 61def test_group_protocols(coordinator): 62 # Requires a subscription 63 try: 64 coordinator.group_protocols() 65 except Errors.IllegalStateError: 66 pass 67 else: 68 assert False, 'Exception not raised when expected' 69 70 coordinator._subscription.subscribe(topics=['foobar']) 71 assert coordinator.group_protocols() == [ 72 ('range', ConsumerProtocolMemberMetadata( 73 RangePartitionAssignor.version, 74 ['foobar'], 75 b'')), 76 ('roundrobin', ConsumerProtocolMemberMetadata( 77 RoundRobinPartitionAssignor.version, 78 ['foobar'], 79 b'')), 80 ] 81 82 83@pytest.mark.parametrize('api_version', [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) 84def test_pattern_subscription(coordinator, api_version): 85 coordinator.config['api_version'] = api_version 86 coordinator._subscription.subscribe(pattern='foo') 87 assert coordinator._subscription.subscription == set([]) 88 assert coordinator._metadata_snapshot == coordinator._build_metadata_snapshot(coordinator._subscription, {}) 89 90 cluster = coordinator._client.cluster 91 cluster.update_metadata(MetadataResponse[0]( 92 # brokers 93 [(0, 'foo', 12), (1, 'bar', 34)], 94 # topics 95 [(0, 'fizz', []), 96 (0, 'foo1', [(0, 0, 0, [], [])]), 97 (0, 'foo2', [(0, 0, 1, [], [])])])) 98 assert coordinator._subscription.subscription == set(['foo1', 'foo2']) 99 100 # 0.9 consumers should trigger dynamic partition assignment 101 if api_version >= (0, 9): 102 assert coordinator._subscription.assignment == {} 103 104 # earlier consumers get all partitions assigned locally 105 else: 106 assert set(coordinator._subscription.assignment.keys()) == set([ 107 TopicPartition('foo1', 0), 108 TopicPartition('foo2', 0)]) 109 110 111def test_lookup_assignor(coordinator): 112 assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor 113 assert coordinator._lookup_assignor('range') is RangePartitionAssignor 114 assert coordinator._lookup_assignor('foobar') is None 115 116 117def test_join_complete(mocker, coordinator): 118 coordinator._subscription.subscribe(topics=['foobar']) 119 assignor = RoundRobinPartitionAssignor() 120 coordinator.config['assignors'] = (assignor,) 121 mocker.spy(assignor, 'on_assignment') 122 assert assignor.on_assignment.call_count == 0 123 assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') 124 coordinator._on_join_complete( 125 0, 'member-foo', 'roundrobin', assignment.encode()) 126 assert assignor.on_assignment.call_count == 1 127 assignor.on_assignment.assert_called_with(assignment) 128 129 130def test_subscription_listener(mocker, coordinator): 131 listener = mocker.MagicMock(spec=ConsumerRebalanceListener) 132 coordinator._subscription.subscribe( 133 topics=['foobar'], 134 listener=listener) 135 136 coordinator._on_join_prepare(0, 'member-foo') 137 assert listener.on_partitions_revoked.call_count == 1 138 listener.on_partitions_revoked.assert_called_with(set([])) 139 140 assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') 141 coordinator._on_join_complete( 142 0, 'member-foo', 'roundrobin', assignment.encode()) 143 assert listener.on_partitions_assigned.call_count == 1 144 listener.on_partitions_assigned.assert_called_with(set([ 145 TopicPartition('foobar', 0), 146 TopicPartition('foobar', 1)])) 147 148 149def test_subscription_listener_failure(mocker, coordinator): 150 listener = mocker.MagicMock(spec=ConsumerRebalanceListener) 151 coordinator._subscription.subscribe( 152 topics=['foobar'], 153 listener=listener) 154 155 # exception raised in listener should not be re-raised by coordinator 156 listener.on_partitions_revoked.side_effect = Exception('crash') 157 coordinator._on_join_prepare(0, 'member-foo') 158 assert listener.on_partitions_revoked.call_count == 1 159 160 assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') 161 coordinator._on_join_complete( 162 0, 'member-foo', 'roundrobin', assignment.encode()) 163 assert listener.on_partitions_assigned.call_count == 1 164 165 166def test_perform_assignment(mocker, coordinator): 167 member_metadata = { 168 'member-foo': ConsumerProtocolMemberMetadata(0, ['foo1'], b''), 169 'member-bar': ConsumerProtocolMemberMetadata(0, ['foo1'], b'') 170 } 171 assignments = { 172 'member-foo': ConsumerProtocolMemberAssignment( 173 0, [('foo1', [0])], b''), 174 'member-bar': ConsumerProtocolMemberAssignment( 175 0, [('foo1', [1])], b'') 176 } 177 178 mocker.patch.object(RoundRobinPartitionAssignor, 'assign') 179 RoundRobinPartitionAssignor.assign.return_value = assignments 180 181 ret = coordinator._perform_assignment( 182 'member-foo', 'roundrobin', 183 [(member, metadata.encode()) 184 for member, metadata in member_metadata.items()]) 185 186 assert RoundRobinPartitionAssignor.assign.call_count == 1 187 RoundRobinPartitionAssignor.assign.assert_called_with( 188 coordinator._client.cluster, member_metadata) 189 assert ret == assignments 190 191 192def test_on_join_prepare(coordinator): 193 coordinator._subscription.subscribe(topics=['foobar']) 194 coordinator._on_join_prepare(0, 'member-foo') 195 196 197def test_need_rejoin(coordinator): 198 # No subscription - no rejoin 199 assert coordinator.need_rejoin() is False 200 201 coordinator._subscription.subscribe(topics=['foobar']) 202 assert coordinator.need_rejoin() is True 203 204 205def test_refresh_committed_offsets_if_needed(mocker, coordinator): 206 mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets', 207 return_value = { 208 TopicPartition('foobar', 0): OffsetAndMetadata(123, b''), 209 TopicPartition('foobar', 1): OffsetAndMetadata(234, b'')}) 210 coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)]) 211 assert coordinator._subscription.needs_fetch_committed_offsets is True 212 coordinator.refresh_committed_offsets_if_needed() 213 assignment = coordinator._subscription.assignment 214 assert assignment[TopicPartition('foobar', 0)].committed == 123 215 assert TopicPartition('foobar', 1) not in assignment 216 assert coordinator._subscription.needs_fetch_committed_offsets is False 217 218 219def test_fetch_committed_offsets(mocker, coordinator): 220 221 # No partitions, no IO polling 222 mocker.patch.object(coordinator._client, 'poll') 223 assert coordinator.fetch_committed_offsets([]) == {} 224 assert coordinator._client.poll.call_count == 0 225 226 # general case -- send offset fetch request, get successful future 227 mocker.patch.object(coordinator, 'ensure_coordinator_ready') 228 mocker.patch.object(coordinator, '_send_offset_fetch_request', 229 return_value=Future().success('foobar')) 230 partitions = [TopicPartition('foobar', 0)] 231 ret = coordinator.fetch_committed_offsets(partitions) 232 assert ret == 'foobar' 233 coordinator._send_offset_fetch_request.assert_called_with(partitions) 234 assert coordinator._client.poll.call_count == 1 235 236 # Failed future is raised if not retriable 237 coordinator._send_offset_fetch_request.return_value = Future().failure(AssertionError) 238 coordinator._client.poll.reset_mock() 239 try: 240 coordinator.fetch_committed_offsets(partitions) 241 except AssertionError: 242 pass 243 else: 244 assert False, 'Exception not raised when expected' 245 assert coordinator._client.poll.call_count == 1 246 247 coordinator._client.poll.reset_mock() 248 coordinator._send_offset_fetch_request.side_effect = [ 249 Future().failure(Errors.RequestTimedOutError), 250 Future().success('fizzbuzz')] 251 252 ret = coordinator.fetch_committed_offsets(partitions) 253 assert ret == 'fizzbuzz' 254 assert coordinator._client.poll.call_count == 2 # call + retry 255 256 257def test_close(mocker, coordinator): 258 mocker.patch.object(coordinator, '_maybe_auto_commit_offsets_sync') 259 mocker.patch.object(coordinator, '_handle_leave_group_response') 260 mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) 261 coordinator.coordinator_id = 0 262 coordinator._generation = Generation(1, 'foobar', b'') 263 coordinator.state = MemberState.STABLE 264 cli = coordinator._client 265 mocker.patch.object(cli, 'send', return_value=Future().success('foobar')) 266 mocker.patch.object(cli, 'poll') 267 268 coordinator.close() 269 assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1 270 coordinator._handle_leave_group_response.assert_called_with('foobar') 271 272 assert coordinator.generation() is None 273 assert coordinator._generation is Generation.NO_GENERATION 274 assert coordinator.state is MemberState.UNJOINED 275 assert coordinator.rejoin_needed is True 276 277 278@pytest.fixture 279def offsets(): 280 return { 281 TopicPartition('foobar', 0): OffsetAndMetadata(123, b''), 282 TopicPartition('foobar', 1): OffsetAndMetadata(234, b''), 283 } 284 285 286def test_commit_offsets_async(mocker, coordinator, offsets): 287 mocker.patch.object(coordinator._client, 'poll') 288 mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) 289 mocker.patch.object(coordinator, 'ensure_coordinator_ready') 290 mocker.patch.object(coordinator, '_send_offset_commit_request', 291 return_value=Future().success('fizzbuzz')) 292 coordinator.commit_offsets_async(offsets) 293 assert coordinator._send_offset_commit_request.call_count == 1 294 295 296def test_commit_offsets_sync(mocker, coordinator, offsets): 297 mocker.patch.object(coordinator, 'ensure_coordinator_ready') 298 mocker.patch.object(coordinator, '_send_offset_commit_request', 299 return_value=Future().success('fizzbuzz')) 300 cli = coordinator._client 301 mocker.patch.object(cli, 'poll') 302 303 # No offsets, no calls 304 assert coordinator.commit_offsets_sync({}) is None 305 assert coordinator._send_offset_commit_request.call_count == 0 306 assert cli.poll.call_count == 0 307 308 ret = coordinator.commit_offsets_sync(offsets) 309 assert coordinator._send_offset_commit_request.call_count == 1 310 assert cli.poll.call_count == 1 311 assert ret == 'fizzbuzz' 312 313 # Failed future is raised if not retriable 314 coordinator._send_offset_commit_request.return_value = Future().failure(AssertionError) 315 coordinator._client.poll.reset_mock() 316 try: 317 coordinator.commit_offsets_sync(offsets) 318 except AssertionError: 319 pass 320 else: 321 assert False, 'Exception not raised when expected' 322 assert coordinator._client.poll.call_count == 1 323 324 coordinator._client.poll.reset_mock() 325 coordinator._send_offset_commit_request.side_effect = [ 326 Future().failure(Errors.RequestTimedOutError), 327 Future().success('fizzbuzz')] 328 329 ret = coordinator.commit_offsets_sync(offsets) 330 assert ret == 'fizzbuzz' 331 assert coordinator._client.poll.call_count == 2 # call + retry 332 333 334@pytest.mark.parametrize( 335 'api_version,group_id,enable,error,has_auto_commit,commit_offsets,warn,exc', [ 336 ((0, 8, 0), 'foobar', True, None, False, False, True, False), 337 ((0, 8, 1), 'foobar', True, None, True, True, False, False), 338 ((0, 8, 2), 'foobar', True, None, True, True, False, False), 339 ((0, 9), 'foobar', False, None, False, False, False, False), 340 ((0, 9), 'foobar', True, Errors.UnknownMemberIdError(), True, True, True, False), 341 ((0, 9), 'foobar', True, Errors.IllegalGenerationError(), True, True, True, False), 342 ((0, 9), 'foobar', True, Errors.RebalanceInProgressError(), True, True, True, False), 343 ((0, 9), 'foobar', True, Exception(), True, True, False, True), 344 ((0, 9), 'foobar', True, None, True, True, False, False), 345 ((0, 9), None, True, None, False, False, True, False), 346 ]) 347def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, 348 error, has_auto_commit, commit_offsets, 349 warn, exc): 350 mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning') 351 mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') 352 client = KafkaClient(api_version=api_version) 353 coordinator = ConsumerCoordinator(client, SubscriptionState(), 354 Metrics(), 355 api_version=api_version, 356 session_timeout_ms=30000, 357 max_poll_interval_ms=30000, 358 enable_auto_commit=enable, 359 group_id=group_id) 360 commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync', 361 side_effect=error) 362 if has_auto_commit: 363 assert coordinator.next_auto_commit_deadline is not None 364 else: 365 assert coordinator.next_auto_commit_deadline is None 366 367 assert coordinator._maybe_auto_commit_offsets_sync() is None 368 369 if has_auto_commit: 370 assert coordinator.next_auto_commit_deadline is not None 371 372 assert commit_sync.call_count == (1 if commit_offsets else 0) 373 assert mock_warn.call_count == (1 if warn else 0) 374 assert mock_exc.call_count == (1 if exc else 0) 375 376 377@pytest.fixture 378def patched_coord(mocker, coordinator): 379 coordinator._subscription.subscribe(topics=['foobar']) 380 mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) 381 coordinator.coordinator_id = 0 382 mocker.patch.object(coordinator, 'coordinator', return_value=0) 383 coordinator._generation = Generation(0, 'foobar', b'') 384 coordinator.state = MemberState.STABLE 385 coordinator.rejoin_needed = False 386 mocker.patch.object(coordinator, 'need_rejoin', return_value=False) 387 mocker.patch.object(coordinator._client, 'least_loaded_node', 388 return_value=1) 389 mocker.patch.object(coordinator._client, 'ready', return_value=True) 390 mocker.patch.object(coordinator._client, 'send') 391 mocker.patch.object(coordinator, '_heartbeat_thread') 392 mocker.spy(coordinator, '_failed_request') 393 mocker.spy(coordinator, '_handle_offset_commit_response') 394 mocker.spy(coordinator, '_handle_offset_fetch_response') 395 return coordinator 396 397 398def test_send_offset_commit_request_fail(mocker, patched_coord, offsets): 399 patched_coord.coordinator_unknown.return_value = True 400 patched_coord.coordinator_id = None 401 patched_coord.coordinator.return_value = None 402 403 # No offsets 404 ret = patched_coord._send_offset_commit_request({}) 405 assert isinstance(ret, Future) 406 assert ret.succeeded() 407 408 # No coordinator 409 ret = patched_coord._send_offset_commit_request(offsets) 410 assert ret.failed() 411 assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError) 412 413 414@pytest.mark.parametrize('api_version,req_type', [ 415 ((0, 8, 1), OffsetCommitRequest[0]), 416 ((0, 8, 2), OffsetCommitRequest[1]), 417 ((0, 9), OffsetCommitRequest[2])]) 418def test_send_offset_commit_request_versions(patched_coord, offsets, 419 api_version, req_type): 420 expect_node = 0 421 patched_coord.config['api_version'] = api_version 422 423 patched_coord._send_offset_commit_request(offsets) 424 (node, request), _ = patched_coord._client.send.call_args 425 assert node == expect_node, 'Unexpected coordinator node' 426 assert isinstance(request, req_type) 427 428 429def test_send_offset_commit_request_failure(patched_coord, offsets): 430 _f = Future() 431 patched_coord._client.send.return_value = _f 432 future = patched_coord._send_offset_commit_request(offsets) 433 (node, request), _ = patched_coord._client.send.call_args 434 error = Exception() 435 _f.failure(error) 436 patched_coord._failed_request.assert_called_with(0, request, future, error) 437 assert future.failed() 438 assert future.exception is error 439 440 441def test_send_offset_commit_request_success(mocker, patched_coord, offsets): 442 _f = Future() 443 patched_coord._client.send.return_value = _f 444 future = patched_coord._send_offset_commit_request(offsets) 445 (node, request), _ = patched_coord._client.send.call_args 446 response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])]) 447 _f.success(response) 448 patched_coord._handle_offset_commit_response.assert_called_with( 449 offsets, future, mocker.ANY, response) 450 451 452@pytest.mark.parametrize('response,error,dead', [ 453 (OffsetCommitResponse[0]([('foobar', [(0, 30), (1, 30)])]), 454 Errors.GroupAuthorizationFailedError, False), 455 (OffsetCommitResponse[0]([('foobar', [(0, 12), (1, 12)])]), 456 Errors.OffsetMetadataTooLargeError, False), 457 (OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]), 458 Errors.InvalidCommitOffsetSizeError, False), 459 (OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]), 460 Errors.GroupLoadInProgressError, False), 461 (OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]), 462 Errors.GroupCoordinatorNotAvailableError, True), 463 (OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]), 464 Errors.NotCoordinatorForGroupError, True), 465 (OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]), 466 Errors.RequestTimedOutError, True), 467 (OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]), 468 Errors.CommitFailedError, False), 469 (OffsetCommitResponse[0]([('foobar', [(0, 22), (1, 22)])]), 470 Errors.CommitFailedError, False), 471 (OffsetCommitResponse[0]([('foobar', [(0, 27), (1, 27)])]), 472 Errors.CommitFailedError, False), 473 (OffsetCommitResponse[0]([('foobar', [(0, 17), (1, 17)])]), 474 Errors.InvalidTopicError, False), 475 (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]), 476 Errors.TopicAuthorizationFailedError, False), 477]) 478def test_handle_offset_commit_response(mocker, patched_coord, offsets, 479 response, error, dead): 480 future = Future() 481 patched_coord._handle_offset_commit_response(offsets, future, time.time(), 482 response) 483 assert isinstance(future.exception, error) 484 assert patched_coord.coordinator_id is (None if dead else 0) 485 486 487@pytest.fixture 488def partitions(): 489 return [TopicPartition('foobar', 0), TopicPartition('foobar', 1)] 490 491 492def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions): 493 patched_coord.coordinator_unknown.return_value = True 494 patched_coord.coordinator_id = None 495 patched_coord.coordinator.return_value = None 496 497 # No partitions 498 ret = patched_coord._send_offset_fetch_request([]) 499 assert isinstance(ret, Future) 500 assert ret.succeeded() 501 assert ret.value == {} 502 503 # No coordinator 504 ret = patched_coord._send_offset_fetch_request(partitions) 505 assert ret.failed() 506 assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError) 507 508 509@pytest.mark.parametrize('api_version,req_type', [ 510 ((0, 8, 1), OffsetFetchRequest[0]), 511 ((0, 8, 2), OffsetFetchRequest[1]), 512 ((0, 9), OffsetFetchRequest[1])]) 513def test_send_offset_fetch_request_versions(patched_coord, partitions, 514 api_version, req_type): 515 # assuming fixture sets coordinator=0, least_loaded_node=1 516 expect_node = 0 517 patched_coord.config['api_version'] = api_version 518 519 patched_coord._send_offset_fetch_request(partitions) 520 (node, request), _ = patched_coord._client.send.call_args 521 assert node == expect_node, 'Unexpected coordinator node' 522 assert isinstance(request, req_type) 523 524 525def test_send_offset_fetch_request_failure(patched_coord, partitions): 526 _f = Future() 527 patched_coord._client.send.return_value = _f 528 future = patched_coord._send_offset_fetch_request(partitions) 529 (node, request), _ = patched_coord._client.send.call_args 530 error = Exception() 531 _f.failure(error) 532 patched_coord._failed_request.assert_called_with(0, request, future, error) 533 assert future.failed() 534 assert future.exception is error 535 536 537def test_send_offset_fetch_request_success(patched_coord, partitions): 538 _f = Future() 539 patched_coord._client.send.return_value = _f 540 future = patched_coord._send_offset_fetch_request(partitions) 541 (node, request), _ = patched_coord._client.send.call_args 542 response = OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]) 543 _f.success(response) 544 patched_coord._handle_offset_fetch_response.assert_called_with( 545 future, response) 546 547 548@pytest.mark.parametrize('response,error,dead', [ 549 (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), 550 Errors.GroupLoadInProgressError, False), 551 (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), 552 Errors.NotCoordinatorForGroupError, True), 553 (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), 554 Errors.UnknownMemberIdError, False), 555 (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), 556 Errors.IllegalGenerationError, False), 557 (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), 558 Errors.TopicAuthorizationFailedError, False), 559 (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), 560 None, False), 561]) 562def test_handle_offset_fetch_response(patched_coord, offsets, 563 response, error, dead): 564 future = Future() 565 patched_coord._handle_offset_fetch_response(future, response) 566 if error is not None: 567 assert isinstance(future.exception, error) 568 else: 569 assert future.succeeded() 570 assert future.value == offsets 571 assert patched_coord.coordinator_id is (None if dead else 0) 572 573 574def test_heartbeat(mocker, patched_coord): 575 heartbeat = HeartbeatThread(patched_coord) 576 577 assert not heartbeat.enabled and not heartbeat.closed 578 579 heartbeat.enable() 580 assert heartbeat.enabled 581 582 heartbeat.disable() 583 assert not heartbeat.enabled 584 585 # heartbeat disables when un-joined 586 heartbeat.enable() 587 patched_coord.state = MemberState.UNJOINED 588 heartbeat._run_once() 589 assert not heartbeat.enabled 590 591 heartbeat.enable() 592 patched_coord.state = MemberState.STABLE 593 mocker.spy(patched_coord, '_send_heartbeat_request') 594 mocker.patch.object(patched_coord.heartbeat, 'should_heartbeat', return_value=True) 595 heartbeat._run_once() 596 assert patched_coord._send_heartbeat_request.call_count == 1 597 598 heartbeat.close() 599 assert heartbeat.closed 600 601 602def test_lookup_coordinator_failure(mocker, coordinator): 603 604 mocker.patch.object(coordinator, '_send_group_coordinator_request', 605 return_value=Future().failure(Exception('foobar'))) 606 future = coordinator.lookup_coordinator() 607 assert future.failed() 608 609 610def test_ensure_active_group(mocker, coordinator): 611 coordinator._subscription.subscribe(topics=['foobar']) 612 mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) 613 mocker.patch.object(coordinator, '_send_join_group_request', return_value=Future().success(True)) 614 mocker.patch.object(coordinator, 'need_rejoin', side_effect=[True, False]) 615 mocker.patch.object(coordinator, '_on_join_complete') 616 mocker.patch.object(coordinator, '_heartbeat_thread') 617 618 coordinator.ensure_active_group() 619 620 coordinator._send_join_group_request.assert_called_once_with() 621