1# pylint: skip-file
2from __future__ import absolute_import
3import time
4
5import pytest
6
7from kafka.client_async import KafkaClient
8from kafka.consumer.subscription_state import (
9    SubscriptionState, ConsumerRebalanceListener)
10from kafka.coordinator.assignors.range import RangePartitionAssignor
11from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
12from kafka.coordinator.base import Generation, MemberState, HeartbeatThread
13from kafka.coordinator.consumer import ConsumerCoordinator
14from kafka.coordinator.protocol import (
15    ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
16import kafka.errors as Errors
17from kafka.future import Future
18from kafka.metrics import Metrics
19from kafka.protocol.commit import (
20    OffsetCommitRequest, OffsetCommitResponse,
21    OffsetFetchRequest, OffsetFetchResponse)
22from kafka.protocol.metadata import MetadataResponse
23from kafka.structs import TopicPartition, OffsetAndMetadata
24from kafka.util import WeakMethod
25
26
27@pytest.fixture
28def client(conn):
29    return KafkaClient(api_version=(0, 9))
30
31@pytest.fixture
32def coordinator(client):
33    return ConsumerCoordinator(client, SubscriptionState(), Metrics())
34
35
36def test_init(client, coordinator):
37    # metadata update on init
38    assert client.cluster._need_update is True
39    assert WeakMethod(coordinator._handle_metadata_update) in client.cluster._listeners
40
41
42@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
43def test_autocommit_enable_api_version(client, api_version):
44    coordinator = ConsumerCoordinator(client, SubscriptionState(),
45                                      Metrics(),
46                                      enable_auto_commit=True,
47                                      session_timeout_ms=30000,   # session_timeout_ms and max_poll_interval_ms
48                                      max_poll_interval_ms=30000, # should be the same to avoid KafkaConfigurationError
49                                      group_id='foobar',
50                                      api_version=api_version)
51    if api_version < (0, 8, 1):
52        assert coordinator.config['enable_auto_commit'] is False
53    else:
54        assert coordinator.config['enable_auto_commit'] is True
55
56
57def test_protocol_type(coordinator):
58    assert coordinator.protocol_type() is 'consumer'
59
60
61def test_group_protocols(coordinator):
62    # Requires a subscription
63    try:
64        coordinator.group_protocols()
65    except Errors.IllegalStateError:
66        pass
67    else:
68        assert False, 'Exception not raised when expected'
69
70    coordinator._subscription.subscribe(topics=['foobar'])
71    assert coordinator.group_protocols() == [
72        ('range', ConsumerProtocolMemberMetadata(
73            RangePartitionAssignor.version,
74            ['foobar'],
75            b'')),
76        ('roundrobin', ConsumerProtocolMemberMetadata(
77            RoundRobinPartitionAssignor.version,
78            ['foobar'],
79            b'')),
80    ]
81
82
83@pytest.mark.parametrize('api_version', [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
84def test_pattern_subscription(coordinator, api_version):
85    coordinator.config['api_version'] = api_version
86    coordinator._subscription.subscribe(pattern='foo')
87    assert coordinator._subscription.subscription == set([])
88    assert coordinator._metadata_snapshot == coordinator._build_metadata_snapshot(coordinator._subscription, {})
89
90    cluster = coordinator._client.cluster
91    cluster.update_metadata(MetadataResponse[0](
92        # brokers
93        [(0, 'foo', 12), (1, 'bar', 34)],
94        # topics
95        [(0, 'fizz', []),
96         (0, 'foo1', [(0, 0, 0, [], [])]),
97         (0, 'foo2', [(0, 0, 1, [], [])])]))
98    assert coordinator._subscription.subscription == set(['foo1', 'foo2'])
99
100    # 0.9 consumers should trigger dynamic partition assignment
101    if api_version >= (0, 9):
102        assert coordinator._subscription.assignment == {}
103
104    # earlier consumers get all partitions assigned locally
105    else:
106        assert set(coordinator._subscription.assignment.keys()) == set([
107            TopicPartition('foo1', 0),
108            TopicPartition('foo2', 0)])
109
110
111def test_lookup_assignor(coordinator):
112    assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor
113    assert coordinator._lookup_assignor('range') is RangePartitionAssignor
114    assert coordinator._lookup_assignor('foobar') is None
115
116
117def test_join_complete(mocker, coordinator):
118    coordinator._subscription.subscribe(topics=['foobar'])
119    assignor = RoundRobinPartitionAssignor()
120    coordinator.config['assignors'] = (assignor,)
121    mocker.spy(assignor, 'on_assignment')
122    assert assignor.on_assignment.call_count == 0
123    assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
124    coordinator._on_join_complete(
125        0, 'member-foo', 'roundrobin', assignment.encode())
126    assert assignor.on_assignment.call_count == 1
127    assignor.on_assignment.assert_called_with(assignment)
128
129
130def test_subscription_listener(mocker, coordinator):
131    listener = mocker.MagicMock(spec=ConsumerRebalanceListener)
132    coordinator._subscription.subscribe(
133        topics=['foobar'],
134        listener=listener)
135
136    coordinator._on_join_prepare(0, 'member-foo')
137    assert listener.on_partitions_revoked.call_count == 1
138    listener.on_partitions_revoked.assert_called_with(set([]))
139
140    assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
141    coordinator._on_join_complete(
142        0, 'member-foo', 'roundrobin', assignment.encode())
143    assert listener.on_partitions_assigned.call_count == 1
144    listener.on_partitions_assigned.assert_called_with(set([
145        TopicPartition('foobar', 0),
146        TopicPartition('foobar', 1)]))
147
148
149def test_subscription_listener_failure(mocker, coordinator):
150    listener = mocker.MagicMock(spec=ConsumerRebalanceListener)
151    coordinator._subscription.subscribe(
152        topics=['foobar'],
153        listener=listener)
154
155    # exception raised in listener should not be re-raised by coordinator
156    listener.on_partitions_revoked.side_effect = Exception('crash')
157    coordinator._on_join_prepare(0, 'member-foo')
158    assert listener.on_partitions_revoked.call_count == 1
159
160    assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'')
161    coordinator._on_join_complete(
162        0, 'member-foo', 'roundrobin', assignment.encode())
163    assert listener.on_partitions_assigned.call_count == 1
164
165
166def test_perform_assignment(mocker, coordinator):
167    member_metadata = {
168        'member-foo': ConsumerProtocolMemberMetadata(0, ['foo1'], b''),
169        'member-bar': ConsumerProtocolMemberMetadata(0, ['foo1'], b'')
170    }
171    assignments = {
172        'member-foo': ConsumerProtocolMemberAssignment(
173            0, [('foo1', [0])], b''),
174        'member-bar': ConsumerProtocolMemberAssignment(
175            0, [('foo1', [1])], b'')
176    }
177
178    mocker.patch.object(RoundRobinPartitionAssignor, 'assign')
179    RoundRobinPartitionAssignor.assign.return_value = assignments
180
181    ret = coordinator._perform_assignment(
182        'member-foo', 'roundrobin',
183        [(member, metadata.encode())
184         for member, metadata in member_metadata.items()])
185
186    assert RoundRobinPartitionAssignor.assign.call_count == 1
187    RoundRobinPartitionAssignor.assign.assert_called_with(
188        coordinator._client.cluster, member_metadata)
189    assert ret == assignments
190
191
192def test_on_join_prepare(coordinator):
193    coordinator._subscription.subscribe(topics=['foobar'])
194    coordinator._on_join_prepare(0, 'member-foo')
195
196
197def test_need_rejoin(coordinator):
198    # No subscription - no rejoin
199    assert coordinator.need_rejoin() is False
200
201    coordinator._subscription.subscribe(topics=['foobar'])
202    assert coordinator.need_rejoin() is True
203
204
205def test_refresh_committed_offsets_if_needed(mocker, coordinator):
206    mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets',
207                        return_value = {
208                            TopicPartition('foobar', 0): OffsetAndMetadata(123, b''),
209                            TopicPartition('foobar', 1): OffsetAndMetadata(234, b'')})
210    coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)])
211    assert coordinator._subscription.needs_fetch_committed_offsets is True
212    coordinator.refresh_committed_offsets_if_needed()
213    assignment = coordinator._subscription.assignment
214    assert assignment[TopicPartition('foobar', 0)].committed == 123
215    assert TopicPartition('foobar', 1) not in assignment
216    assert coordinator._subscription.needs_fetch_committed_offsets is False
217
218
219def test_fetch_committed_offsets(mocker, coordinator):
220
221    # No partitions, no IO polling
222    mocker.patch.object(coordinator._client, 'poll')
223    assert coordinator.fetch_committed_offsets([]) == {}
224    assert coordinator._client.poll.call_count == 0
225
226    # general case -- send offset fetch request, get successful future
227    mocker.patch.object(coordinator, 'ensure_coordinator_ready')
228    mocker.patch.object(coordinator, '_send_offset_fetch_request',
229                        return_value=Future().success('foobar'))
230    partitions = [TopicPartition('foobar', 0)]
231    ret = coordinator.fetch_committed_offsets(partitions)
232    assert ret == 'foobar'
233    coordinator._send_offset_fetch_request.assert_called_with(partitions)
234    assert coordinator._client.poll.call_count == 1
235
236    # Failed future is raised if not retriable
237    coordinator._send_offset_fetch_request.return_value = Future().failure(AssertionError)
238    coordinator._client.poll.reset_mock()
239    try:
240        coordinator.fetch_committed_offsets(partitions)
241    except AssertionError:
242        pass
243    else:
244        assert False, 'Exception not raised when expected'
245    assert coordinator._client.poll.call_count == 1
246
247    coordinator._client.poll.reset_mock()
248    coordinator._send_offset_fetch_request.side_effect = [
249        Future().failure(Errors.RequestTimedOutError),
250        Future().success('fizzbuzz')]
251
252    ret = coordinator.fetch_committed_offsets(partitions)
253    assert ret == 'fizzbuzz'
254    assert coordinator._client.poll.call_count == 2 # call + retry
255
256
257def test_close(mocker, coordinator):
258    mocker.patch.object(coordinator, '_maybe_auto_commit_offsets_sync')
259    mocker.patch.object(coordinator, '_handle_leave_group_response')
260    mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
261    coordinator.coordinator_id = 0
262    coordinator._generation = Generation(1, 'foobar', b'')
263    coordinator.state = MemberState.STABLE
264    cli = coordinator._client
265    mocker.patch.object(cli, 'send', return_value=Future().success('foobar'))
266    mocker.patch.object(cli, 'poll')
267
268    coordinator.close()
269    assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1
270    coordinator._handle_leave_group_response.assert_called_with('foobar')
271
272    assert coordinator.generation() is None
273    assert coordinator._generation is Generation.NO_GENERATION
274    assert coordinator.state is MemberState.UNJOINED
275    assert coordinator.rejoin_needed is True
276
277
278@pytest.fixture
279def offsets():
280    return {
281        TopicPartition('foobar', 0): OffsetAndMetadata(123, b''),
282        TopicPartition('foobar', 1): OffsetAndMetadata(234, b''),
283    }
284
285
286def test_commit_offsets_async(mocker, coordinator, offsets):
287    mocker.patch.object(coordinator._client, 'poll')
288    mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
289    mocker.patch.object(coordinator, 'ensure_coordinator_ready')
290    mocker.patch.object(coordinator, '_send_offset_commit_request',
291                        return_value=Future().success('fizzbuzz'))
292    coordinator.commit_offsets_async(offsets)
293    assert coordinator._send_offset_commit_request.call_count == 1
294
295
296def test_commit_offsets_sync(mocker, coordinator, offsets):
297    mocker.patch.object(coordinator, 'ensure_coordinator_ready')
298    mocker.patch.object(coordinator, '_send_offset_commit_request',
299                        return_value=Future().success('fizzbuzz'))
300    cli = coordinator._client
301    mocker.patch.object(cli, 'poll')
302
303    # No offsets, no calls
304    assert coordinator.commit_offsets_sync({}) is None
305    assert coordinator._send_offset_commit_request.call_count == 0
306    assert cli.poll.call_count == 0
307
308    ret = coordinator.commit_offsets_sync(offsets)
309    assert coordinator._send_offset_commit_request.call_count == 1
310    assert cli.poll.call_count == 1
311    assert ret == 'fizzbuzz'
312
313    # Failed future is raised if not retriable
314    coordinator._send_offset_commit_request.return_value = Future().failure(AssertionError)
315    coordinator._client.poll.reset_mock()
316    try:
317        coordinator.commit_offsets_sync(offsets)
318    except AssertionError:
319        pass
320    else:
321        assert False, 'Exception not raised when expected'
322    assert coordinator._client.poll.call_count == 1
323
324    coordinator._client.poll.reset_mock()
325    coordinator._send_offset_commit_request.side_effect = [
326        Future().failure(Errors.RequestTimedOutError),
327        Future().success('fizzbuzz')]
328
329    ret = coordinator.commit_offsets_sync(offsets)
330    assert ret == 'fizzbuzz'
331    assert coordinator._client.poll.call_count == 2 # call + retry
332
333
334@pytest.mark.parametrize(
335    'api_version,group_id,enable,error,has_auto_commit,commit_offsets,warn,exc', [
336        ((0, 8, 0), 'foobar', True, None, False, False, True, False),
337        ((0, 8, 1), 'foobar', True, None, True, True, False, False),
338        ((0, 8, 2), 'foobar', True, None, True, True, False, False),
339        ((0, 9), 'foobar', False, None, False, False, False, False),
340        ((0, 9), 'foobar', True, Errors.UnknownMemberIdError(), True, True, True, False),
341        ((0, 9), 'foobar', True, Errors.IllegalGenerationError(), True, True, True, False),
342        ((0, 9), 'foobar', True, Errors.RebalanceInProgressError(), True, True, True, False),
343        ((0, 9), 'foobar', True, Exception(), True, True, False, True),
344        ((0, 9), 'foobar', True, None, True, True, False, False),
345        ((0, 9), None, True, None, False, False, True, False),
346    ])
347def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
348                                        error, has_auto_commit, commit_offsets,
349                                        warn, exc):
350    mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning')
351    mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')
352    client = KafkaClient(api_version=api_version)
353    coordinator = ConsumerCoordinator(client, SubscriptionState(),
354                                      Metrics(),
355                                      api_version=api_version,
356                                      session_timeout_ms=30000,
357                                      max_poll_interval_ms=30000,
358                                      enable_auto_commit=enable,
359                                      group_id=group_id)
360    commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync',
361                                      side_effect=error)
362    if has_auto_commit:
363        assert coordinator.next_auto_commit_deadline is not None
364    else:
365        assert coordinator.next_auto_commit_deadline is None
366
367    assert coordinator._maybe_auto_commit_offsets_sync() is None
368
369    if has_auto_commit:
370        assert coordinator.next_auto_commit_deadline is not None
371
372    assert commit_sync.call_count == (1 if commit_offsets else 0)
373    assert mock_warn.call_count == (1 if warn else 0)
374    assert mock_exc.call_count == (1 if exc else 0)
375
376
377@pytest.fixture
378def patched_coord(mocker, coordinator):
379    coordinator._subscription.subscribe(topics=['foobar'])
380    mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
381    coordinator.coordinator_id = 0
382    mocker.patch.object(coordinator, 'coordinator', return_value=0)
383    coordinator._generation = Generation(0, 'foobar', b'')
384    coordinator.state = MemberState.STABLE
385    coordinator.rejoin_needed = False
386    mocker.patch.object(coordinator, 'need_rejoin', return_value=False)
387    mocker.patch.object(coordinator._client, 'least_loaded_node',
388                        return_value=1)
389    mocker.patch.object(coordinator._client, 'ready', return_value=True)
390    mocker.patch.object(coordinator._client, 'send')
391    mocker.patch.object(coordinator, '_heartbeat_thread')
392    mocker.spy(coordinator, '_failed_request')
393    mocker.spy(coordinator, '_handle_offset_commit_response')
394    mocker.spy(coordinator, '_handle_offset_fetch_response')
395    return coordinator
396
397
398def test_send_offset_commit_request_fail(mocker, patched_coord, offsets):
399    patched_coord.coordinator_unknown.return_value = True
400    patched_coord.coordinator_id = None
401    patched_coord.coordinator.return_value = None
402
403    # No offsets
404    ret = patched_coord._send_offset_commit_request({})
405    assert isinstance(ret, Future)
406    assert ret.succeeded()
407
408    # No coordinator
409    ret = patched_coord._send_offset_commit_request(offsets)
410    assert ret.failed()
411    assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError)
412
413
414@pytest.mark.parametrize('api_version,req_type', [
415    ((0, 8, 1), OffsetCommitRequest[0]),
416    ((0, 8, 2), OffsetCommitRequest[1]),
417    ((0, 9), OffsetCommitRequest[2])])
418def test_send_offset_commit_request_versions(patched_coord, offsets,
419                                             api_version, req_type):
420    expect_node = 0
421    patched_coord.config['api_version'] = api_version
422
423    patched_coord._send_offset_commit_request(offsets)
424    (node, request), _ = patched_coord._client.send.call_args
425    assert node == expect_node, 'Unexpected coordinator node'
426    assert isinstance(request, req_type)
427
428
429def test_send_offset_commit_request_failure(patched_coord, offsets):
430    _f = Future()
431    patched_coord._client.send.return_value = _f
432    future = patched_coord._send_offset_commit_request(offsets)
433    (node, request), _ = patched_coord._client.send.call_args
434    error = Exception()
435    _f.failure(error)
436    patched_coord._failed_request.assert_called_with(0, request, future, error)
437    assert future.failed()
438    assert future.exception is error
439
440
441def test_send_offset_commit_request_success(mocker, patched_coord, offsets):
442    _f = Future()
443    patched_coord._client.send.return_value = _f
444    future = patched_coord._send_offset_commit_request(offsets)
445    (node, request), _ = patched_coord._client.send.call_args
446    response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])])
447    _f.success(response)
448    patched_coord._handle_offset_commit_response.assert_called_with(
449        offsets, future, mocker.ANY, response)
450
451
452@pytest.mark.parametrize('response,error,dead', [
453    (OffsetCommitResponse[0]([('foobar', [(0, 30), (1, 30)])]),
454     Errors.GroupAuthorizationFailedError, False),
455    (OffsetCommitResponse[0]([('foobar', [(0, 12), (1, 12)])]),
456     Errors.OffsetMetadataTooLargeError, False),
457    (OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]),
458     Errors.InvalidCommitOffsetSizeError, False),
459    (OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]),
460     Errors.GroupLoadInProgressError, False),
461    (OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]),
462     Errors.GroupCoordinatorNotAvailableError, True),
463    (OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]),
464     Errors.NotCoordinatorForGroupError, True),
465    (OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]),
466     Errors.RequestTimedOutError, True),
467    (OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]),
468     Errors.CommitFailedError, False),
469    (OffsetCommitResponse[0]([('foobar', [(0, 22), (1, 22)])]),
470     Errors.CommitFailedError, False),
471    (OffsetCommitResponse[0]([('foobar', [(0, 27), (1, 27)])]),
472     Errors.CommitFailedError, False),
473    (OffsetCommitResponse[0]([('foobar', [(0, 17), (1, 17)])]),
474     Errors.InvalidTopicError, False),
475    (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]),
476     Errors.TopicAuthorizationFailedError, False),
477])
478def test_handle_offset_commit_response(mocker, patched_coord, offsets,
479                                       response, error, dead):
480    future = Future()
481    patched_coord._handle_offset_commit_response(offsets, future, time.time(),
482                                                 response)
483    assert isinstance(future.exception, error)
484    assert patched_coord.coordinator_id is (None if dead else 0)
485
486
487@pytest.fixture
488def partitions():
489    return [TopicPartition('foobar', 0), TopicPartition('foobar', 1)]
490
491
492def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions):
493    patched_coord.coordinator_unknown.return_value = True
494    patched_coord.coordinator_id = None
495    patched_coord.coordinator.return_value = None
496
497    # No partitions
498    ret = patched_coord._send_offset_fetch_request([])
499    assert isinstance(ret, Future)
500    assert ret.succeeded()
501    assert ret.value == {}
502
503    # No coordinator
504    ret = patched_coord._send_offset_fetch_request(partitions)
505    assert ret.failed()
506    assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError)
507
508
509@pytest.mark.parametrize('api_version,req_type', [
510    ((0, 8, 1), OffsetFetchRequest[0]),
511    ((0, 8, 2), OffsetFetchRequest[1]),
512    ((0, 9), OffsetFetchRequest[1])])
513def test_send_offset_fetch_request_versions(patched_coord, partitions,
514                                            api_version, req_type):
515    # assuming fixture sets coordinator=0, least_loaded_node=1
516    expect_node = 0
517    patched_coord.config['api_version'] = api_version
518
519    patched_coord._send_offset_fetch_request(partitions)
520    (node, request), _ = patched_coord._client.send.call_args
521    assert node == expect_node, 'Unexpected coordinator node'
522    assert isinstance(request, req_type)
523
524
525def test_send_offset_fetch_request_failure(patched_coord, partitions):
526    _f = Future()
527    patched_coord._client.send.return_value = _f
528    future = patched_coord._send_offset_fetch_request(partitions)
529    (node, request), _ = patched_coord._client.send.call_args
530    error = Exception()
531    _f.failure(error)
532    patched_coord._failed_request.assert_called_with(0, request, future, error)
533    assert future.failed()
534    assert future.exception is error
535
536
537def test_send_offset_fetch_request_success(patched_coord, partitions):
538    _f = Future()
539    patched_coord._client.send.return_value = _f
540    future = patched_coord._send_offset_fetch_request(partitions)
541    (node, request), _ = patched_coord._client.send.call_args
542    response = OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])])
543    _f.success(response)
544    patched_coord._handle_offset_fetch_response.assert_called_with(
545        future, response)
546
547
548@pytest.mark.parametrize('response,error,dead', [
549    (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]),
550     Errors.GroupLoadInProgressError, False),
551    (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]),
552     Errors.NotCoordinatorForGroupError, True),
553    (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]),
554     Errors.UnknownMemberIdError, False),
555    (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]),
556     Errors.IllegalGenerationError, False),
557    (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]),
558     Errors.TopicAuthorizationFailedError, False),
559    (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]),
560     None, False),
561])
562def test_handle_offset_fetch_response(patched_coord, offsets,
563                                      response, error, dead):
564    future = Future()
565    patched_coord._handle_offset_fetch_response(future, response)
566    if error is not None:
567        assert isinstance(future.exception, error)
568    else:
569        assert future.succeeded()
570        assert future.value == offsets
571    assert patched_coord.coordinator_id is (None if dead else 0)
572
573
574def test_heartbeat(mocker, patched_coord):
575    heartbeat = HeartbeatThread(patched_coord)
576
577    assert not heartbeat.enabled and not heartbeat.closed
578
579    heartbeat.enable()
580    assert heartbeat.enabled
581
582    heartbeat.disable()
583    assert not heartbeat.enabled
584
585    # heartbeat disables when un-joined
586    heartbeat.enable()
587    patched_coord.state = MemberState.UNJOINED
588    heartbeat._run_once()
589    assert not heartbeat.enabled
590
591    heartbeat.enable()
592    patched_coord.state = MemberState.STABLE
593    mocker.spy(patched_coord, '_send_heartbeat_request')
594    mocker.patch.object(patched_coord.heartbeat, 'should_heartbeat', return_value=True)
595    heartbeat._run_once()
596    assert patched_coord._send_heartbeat_request.call_count == 1
597
598    heartbeat.close()
599    assert heartbeat.closed
600
601
602def test_lookup_coordinator_failure(mocker, coordinator):
603
604    mocker.patch.object(coordinator, '_send_group_coordinator_request',
605                        return_value=Future().failure(Exception('foobar')))
606    future = coordinator.lookup_coordinator()
607    assert future.failed()
608
609
610def test_ensure_active_group(mocker, coordinator):
611    coordinator._subscription.subscribe(topics=['foobar'])
612    mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
613    mocker.patch.object(coordinator, '_send_join_group_request', return_value=Future().success(True))
614    mocker.patch.object(coordinator, 'need_rejoin', side_effect=[True, False])
615    mocker.patch.object(coordinator, '_on_join_complete')
616    mocker.patch.object(coordinator, '_heartbeat_thread')
617
618    coordinator.ensure_active_group()
619
620    coordinator._send_join_group_request.assert_called_once_with()
621